diff --git a/extropy/core/models/simulation.py b/extropy/core/models/simulation.py index 2a3ceaa..0173358 100644 --- a/extropy/core/models/simulation.py +++ b/extropy/core/models/simulation.py @@ -310,6 +310,10 @@ class ReasoningContext(BaseModel): background_context: str | None = Field( default=None, description="Scenario-level background context" ) + identity_threat_summary: str | None = Field( + default=None, + description="Deterministic identity-relevance framing when scenario content threatens group identity", + ) agent_names: dict[str, str] = Field( default_factory=dict, description="Mapping of agent_id → first name for resolving peer references", @@ -357,6 +361,10 @@ class ReasoningResponse(BaseModel): position: str | None = Field( default=None, description="Classified position (filled by Pass 2)" ) + public_position: str | None = Field( + default=None, + description="Public-facing position when THINK/SAY diverges (high fidelity)", + ) sentiment: float | None = Field( default=None, description="Sentiment value (-1 to 1)" ) diff --git a/extropy/simulation/engine.py b/extropy/simulation/engine.py index 850efcc..ddfca6e 100644 --- a/extropy/simulation/engine.py +++ b/extropy/simulation/engine.py @@ -84,6 +84,7 @@ _BOUNDED_CONFIDENCE_RHO = 0.35 _PRIVATE_ADJUSTMENT_RHO = 0.12 _PRIVATE_FLIP_CONVICTION = CONVICTION_MAP[ConvictionLevel.FIRM] +_IDENTITY_VALUE_SENTINELS = {"", "unknown", "none", "n/a", "na", "null"} class _StateTimelineAdapter: @@ -926,7 +927,8 @@ def _process_reasoning_chunk( public_conviction = max(0.0, min(1.0, public_conviction)) public_will_share = response.will_share - public_position = response.position + candidate_public_position = response.public_position or response.position + public_position = candidate_public_position if ( old_public_conviction is not None @@ -934,8 +936,8 @@ def _process_reasoning_chunk( ): if ( old_public_position is not None - and response.position is not None - and old_public_position != response.position + and candidate_public_position is not None + and old_public_position != candidate_public_position ): new_conviction = ( public_conviction if public_conviction is not None else 0.0 @@ -943,7 +945,7 @@ def _process_reasoning_chunk( if new_conviction < _MODERATE_CONVICTION: logger.info( f"[CONVICTION] Agent {agent_id}: public flip from {old_public_position} " - f"to {response.position} rejected (old conviction={float_to_conviction(old_public_conviction)}, " + f"to {candidate_public_position} rejected (old conviction={float_to_conviction(old_public_conviction)}, " f"new conviction={float_to_conviction(public_conviction)})" ) public_position = old_public_position @@ -1447,6 +1449,9 @@ def _build_reasoning_context( ctx.macro_summary = macro_summary ctx.local_mood_summary = local_mood_summary ctx.background_context = self.scenario.background_context + ctx.identity_threat_summary = self._render_identity_threat_context( + agent, timestep + ) ctx.agent_names = self._agent_names # Populate Phase C fields @@ -1476,6 +1481,194 @@ def _build_reasoning_context( return ctx + @staticmethod + def _identity_value(agent: dict[str, Any], keys: tuple[str, ...]) -> str | None: + """Return the first meaningful identity value from candidate attribute keys.""" + for key in keys: + value = agent.get(key) + if value is None: + continue + if isinstance(value, bool): + if value: + return "yes" + continue + if isinstance(value, (list, tuple, set)): + if not value: + continue + return ", ".join(str(v) for v in value) + text = str(value).strip() + if text.lower() in _IDENTITY_VALUE_SENTINELS: + continue + return text.replace("_", " ") + return None + + def _render_identity_threat_context( + self, + agent: dict[str, Any], + timestep: int, + ) -> str | None: + """Render deterministic identity-relevance framing from scenario + agent data.""" + corpus_parts = [ + self.scenario.meta.description, + self.scenario.event.content, + self.scenario.background_context, + ] + if self.scenario.timeline: + for timeline_event in self.scenario.timeline: + if timeline_event.timestep <= timestep: + corpus_parts.append(timeline_event.event.content) + if timeline_event.description: + corpus_parts.append(timeline_event.description) + corpus = " ".join(str(part).lower() for part in corpus_parts if part).strip() + if not corpus: + return None + + def scenario_mentions(keywords: tuple[str, ...]) -> bool: + return any(keyword in corpus for keyword in keywords) + + identity_dimensions: list[str] = [] + + political_value = self._identity_value( + agent, + ("political_orientation", "political_ideology", "party_affiliation"), + ) + if political_value and scenario_mentions( + ( + "liberal", + "conservative", + "left", + "right", + "republican", + "democrat", + "politic", + "ideolog", + "culture war", + "censorship", + "book ban", + "school board", + " ban ", + ) + ): + identity_dimensions.append(f"political orientation ({political_value})") + + religious_value = self._identity_value( + agent, ("religious_affiliation", "religion", "faith_tradition") + ) + if religious_value and scenario_mentions( + ( + "religio", + "faith", + "church", + "mosque", + "temple", + "christian", + "muslim", + "jewish", + "moral", + ) + ): + identity_dimensions.append(f"religious affiliation ({religious_value})") + + race_value = self._identity_value(agent, ("race_ethnicity", "race", "ethnicity")) + if race_value and scenario_mentions( + ( + "race", + "racial", + "ethnic", + "minority", + "majority", + "immigra", + "inclusion", + "diversity", + "equity", + "civil rights", + "discrimination", + ) + ): + identity_dimensions.append(f"race/ethnicity ({race_value})") + + gender_value = self._identity_value( + agent, + ("gender_identity", "gender", "sexual_orientation"), + ) + if gender_value and scenario_mentions( + ( + "gender", + "women", + "woman", + "men", + "man", + "lgbt", + "lgbtq", + "queer", + "gay", + "lesbian", + "trans", + "transgender", + "sexual orientation", + "sexuality", + "pronoun", + ) + ): + identity_dimensions.append(f"gender/sexual identity ({gender_value})") + + parental_value = self._identity_value( + agent, ("parental_status", "household_role", "family_role") + ) + has_dependents = bool(agent.get("dependents")) + has_children_flag = bool(agent.get("has_children")) + if ( + scenario_mentions( + ( + "parent", + "parents", + "kid", + "kids", + "children", + "school", + "curriculum", + "classroom", + "book", + "library", + "parental rights", + ) + ) + and (parental_value or has_dependents or has_children_flag) + ): + if parental_value: + identity_dimensions.append(f"parent/family role ({parental_value})") + elif has_dependents: + identity_dimensions.append("parent/family role (caregiver)") + else: + identity_dimensions.append("parent/family role") + + citizenship_value = self._identity_value( + agent, ("citizenship_status", "nationality", "country_of_origin") + ) + if citizenship_value and scenario_mentions( + ( + "immigration", + "immigrant", + "border", + "citizenship", + "deport", + "national identity", + "patriot", + ) + ): + identity_dimensions.append(f"citizenship/national identity ({citizenship_value})") + + if not identity_dimensions: + return None + + framed_dimensions = "; ".join(identity_dimensions) + return ( + "This development can feel identity-relevant, not just practical. " + f"Parts of who I am that may feel implicated: {framed_dimensions}. " + "If it feels personal, acknowledge that in both your internal reaction " + "and what you choose to say publicly." + ) + def _get_peer_opinions(self, agent_id: str) -> list[PeerOpinion]: """Get opinions of connected peers who have visibly shared. @@ -2041,6 +2234,7 @@ def _export_results(self) -> None: "population_size": len(self.agents), "strong_model": self.config.strong, "fast_model": self.config.fast, + "fidelity": self.config.fidelity, "seed": self.seed, "multi_touch_threshold": self.config.multi_touch_threshold, "completed_at": datetime.now().isoformat(), diff --git a/extropy/simulation/reasoning.py b/extropy/simulation/reasoning.py index 1cba318..7e0c293 100644 --- a/extropy/simulation/reasoning.py +++ b/extropy/simulation/reasoning.py @@ -117,6 +117,17 @@ def build_pass1_prompt( ] ) + # --- Identity relevance framing --- + if context.identity_threat_summary: + prompt_parts.extend( + [ + "## Identity Relevance", + "", + context.identity_threat_summary, + "", + ] + ) + # --- Exposure history (named + experiential) --- prompt_parts.extend(["## How This Reached You", ""]) @@ -754,6 +765,107 @@ def _sentiment_to_tone(sentiment: float) -> str: return "strongly opposed" +async def _classify_text_async( + text: str, + scenario: ScenarioSpec, + pass2_schema: dict[str, Any], + classify_model: str | None, + config: SimulationRunConfig, + context: ReasoningContext, + rate_limiter: Any = None, +) -> tuple[dict[str, Any], TokenUsage]: + """Classify free text into structured outcomes using Pass 2 schema.""" + if not text.strip(): + return {}, TokenUsage() + + pass2_prompt = build_pass2_prompt(text, scenario) + usage = TokenUsage() + + for attempt in range(config.max_retries): + try: + if rate_limiter: + estimated_input = len(pass2_prompt) // 4 + estimated_output = 80 + await rate_limiter.routine.acquire( + estimated_input_tokens=estimated_input, + estimated_output_tokens=estimated_output, + ) + + call_start = time.time() + pass2_response, usage = await asyncio.wait_for( + simple_call_async( + prompt=pass2_prompt, + response_schema=pass2_schema, + schema_name="classification", + model=classify_model, + ), + timeout=20.0, + ) + call_elapsed = time.time() - call_start + logger.info(f"[PASS2] Agent {context.agent_id} - {call_elapsed:.2f}s") + + if pass2_response: + return dict(pass2_response), usage + except asyncio.TimeoutError: + logger.warning( + f"[PASS2] Agent {context.agent_id} - attempt {attempt + 1} timed out after 20s" + ) + if attempt == config.max_retries - 1: + logger.warning( + f"[PASS2] Agent {context.agent_id} - all retries exhausted, proceeding without classification" + ) + except Exception as e: + logger.warning( + f"[PASS2] Agent {context.agent_id} - attempt {attempt + 1} failed: {e}" + ) + if attempt == config.max_retries - 1: + logger.warning( + f"[PASS2] Agent {context.agent_id} - all retries exhausted, proceeding without classification" + ) + + return {}, usage + + +def _classify_text_sync( + text: str, + scenario: ScenarioSpec, + pass2_schema: dict[str, Any], + classify_model: str | None, + config: SimulationRunConfig, + context: ReasoningContext, +) -> dict[str, Any]: + """Synchronous counterpart for classifying text into outcomes.""" + if not text.strip(): + return {} + + pass2_prompt = build_pass2_prompt(text, scenario) + for attempt in range(config.max_retries): + try: + call_start = time.time() + pass2_response = simple_call( + prompt=pass2_prompt, + response_schema=pass2_schema, + schema_name="classification", + model=classify_model, + log=True, + ) + call_elapsed = time.time() - call_start + logger.info(f"[PASS2] Agent {context.agent_id} - API call took {call_elapsed:.2f}s") + + if pass2_response: + return dict(pass2_response) + except Exception as e: + logger.warning( + f"[PASS2] Agent {context.agent_id} - attempt {attempt + 1} failed: {e}" + ) + if attempt == config.max_retries - 1: + logger.warning( + f"[PASS2] Agent {context.agent_id} - classification failed, continuing without" + ) + + return {} + + # ============================================================================= # Two-pass reasoning (async) # ============================================================================= @@ -836,6 +948,7 @@ async def _reason_agent_two_pass_async( # Extract Pass 1 fields reasoning = pass1_response.get("reasoning", "") + private_thought = pass1_response.get("private_thought", "") public_statement = pass1_response.get("public_statement", "") reasoning_summary = pass1_response.get("reasoning_summary", "") sentiment = pass1_response.get("sentiment") @@ -851,60 +964,40 @@ async def _reason_agent_two_pass_async( # === Pass 2: Classification (if needed) === pass2_schema = build_pass2_schema(scenario.outcomes) position = None + public_position = None outcomes = {} - pass2_usage = TokenUsage() + pass2_usage_private = TokenUsage() + pass2_usage_public = TokenUsage() if pass2_schema: - pass2_prompt = build_pass2_prompt(reasoning, scenario) - - for attempt in range(config.max_retries): - try: - if rate_limiter: - # Dynamic token estimate from prompt length - estimated_input = len(pass2_prompt) // 4 - estimated_output = 80 # classification is small - await rate_limiter.routine.acquire( - estimated_input_tokens=estimated_input, - estimated_output_tokens=estimated_output, - ) - - call_start = time.time() - pass2_response, pass2_usage = await asyncio.wait_for( - simple_call_async( - prompt=pass2_prompt, - response_schema=pass2_schema, - schema_name="classification", - model=classify_model, - ), - timeout=20.0, - ) - call_elapsed = time.time() - call_start - - logger.info(f"[PASS2] Agent {context.agent_id} - {call_elapsed:.2f}s") + private_text = private_thought or reasoning + outcomes, pass2_usage_private = await _classify_text_async( + text=private_text, + scenario=scenario, + pass2_schema=pass2_schema, + classify_model=classify_model, + config=config, + context=context, + rate_limiter=rate_limiter, + ) + if position_outcome and position_outcome in outcomes: + position = outcomes[position_outcome] + + if config.fidelity == "high": + public_outcomes, pass2_usage_public = await _classify_text_async( + text=public_statement, + scenario=scenario, + pass2_schema=pass2_schema, + classify_model=classify_model, + config=config, + context=context, + rate_limiter=rate_limiter, + ) + if position_outcome and position_outcome in public_outcomes: + public_position = public_outcomes[position_outcome] - if pass2_response: - outcomes = dict(pass2_response) - # Extract primary position from outcomes - if position_outcome and position_outcome in pass2_response: - position = pass2_response[position_outcome] - break - except asyncio.TimeoutError: - logger.warning( - f"[PASS2] Agent {context.agent_id} - attempt {attempt + 1} timed out after 20s" - ) - if attempt == config.max_retries - 1: - logger.warning( - f"[PASS2] Agent {context.agent_id} - all retries exhausted, proceeding without classification" - ) - except Exception as e: - logger.warning( - f"[PASS2] Agent {context.agent_id} - attempt {attempt + 1} failed: {e}" - ) - if attempt == config.max_retries - 1: - # Pass 2 failure is non-fatal — we still have Pass 1 data - logger.warning( - f"[PASS2] Agent {context.agent_id} - all retries exhausted, proceeding without classification" - ) + if public_position is None: + public_position = position # Merge sentiment into outcomes for backwards compat if sentiment is not None: @@ -912,6 +1005,7 @@ async def _reason_agent_two_pass_async( return ReasoningResponse( position=position, + public_position=public_position, sentiment=sentiment, conviction=conviction_float, public_statement=public_statement, @@ -923,8 +1017,12 @@ async def _reason_agent_two_pass_async( actions=actions, pass1_input_tokens=pass1_usage.input_tokens, pass1_output_tokens=pass1_usage.output_tokens, - pass2_input_tokens=pass2_usage.input_tokens, - pass2_output_tokens=pass2_usage.output_tokens, + pass2_input_tokens=( + pass2_usage_private.input_tokens + pass2_usage_public.input_tokens + ), + pass2_output_tokens=( + pass2_usage_private.output_tokens + pass2_usage_public.output_tokens + ), ) @@ -1019,8 +1117,10 @@ async def _reason_agent_merged_async( # Extract position from outcomes position = None + public_position = None if position_outcome and position_outcome in response: position = response[position_outcome] + public_position = position # Build outcomes dict (everything except the Pass 1 fields) pass1_fields = { @@ -1038,8 +1138,27 @@ async def _reason_agent_merged_async( if sentiment is not None: outcomes["sentiment"] = sentiment + pass2_usage_public = TokenUsage() + pass2_schema = build_pass2_schema(scenario.outcomes) + if config.fidelity == "high" and pass2_schema: + public_outcomes, pass2_usage_public = await _classify_text_async( + text=public_statement, + scenario=scenario, + pass2_schema=pass2_schema, + classify_model=config.fast or None, + config=config, + context=context, + rate_limiter=rate_limiter, + ) + if position_outcome and position_outcome in public_outcomes: + public_position = public_outcomes[position_outcome] + + if public_position is None: + public_position = position + return ReasoningResponse( position=position, + public_position=public_position, sentiment=sentiment, conviction=conviction_float, public_statement=public_statement, @@ -1051,8 +1170,8 @@ async def _reason_agent_merged_async( actions=actions, pass1_input_tokens=usage.input_tokens, pass1_output_tokens=usage.output_tokens, - pass2_input_tokens=0, - pass2_output_tokens=0, + pass2_input_tokens=pass2_usage_public.input_tokens, + pass2_output_tokens=pass2_usage_public.output_tokens, ) @@ -1133,6 +1252,7 @@ def reason_agent( # Extract Pass 1 fields reasoning = pass1_response.get("reasoning", "") + private_thought = pass1_response.get("private_thought", "") public_statement = pass1_response.get("public_statement", "") reasoning_summary = pass1_response.get("reasoning_summary", "") sentiment = pass1_response.get("sentiment") @@ -1143,41 +1263,37 @@ def reason_agent( # === Pass 2: Classification === pass2_schema = build_pass2_schema(scenario.outcomes) position = None + public_position = None outcomes = {} if pass2_schema: - pass2_prompt = build_pass2_prompt(reasoning, scenario) + private_text = private_thought or reasoning classify_model = config.fast or None + outcomes = _classify_text_sync( + text=private_text, + scenario=scenario, + pass2_schema=pass2_schema, + classify_model=classify_model, + config=config, + context=context, + ) + if position_outcome and position_outcome in outcomes: + position = outcomes[position_outcome] + + if config.fidelity == "high": + public_outcomes = _classify_text_sync( + text=public_statement, + scenario=scenario, + pass2_schema=pass2_schema, + classify_model=classify_model, + config=config, + context=context, + ) + if position_outcome and position_outcome in public_outcomes: + public_position = public_outcomes[position_outcome] - for attempt in range(config.max_retries): - try: - call_start = time.time() - pass2_response = simple_call( - prompt=pass2_prompt, - response_schema=pass2_schema, - schema_name="classification", - model=classify_model, - log=True, - ) - call_elapsed = time.time() - call_start - - logger.info( - f"[PASS2] Agent {context.agent_id} - API call took {call_elapsed:.2f}s" - ) - - if pass2_response: - outcomes = dict(pass2_response) - if position_outcome and position_outcome in pass2_response: - position = pass2_response[position_outcome] - break - except Exception as e: - logger.warning( - f"[PASS2] Agent {context.agent_id} - attempt {attempt + 1} failed: {e}" - ) - if attempt == config.max_retries - 1: - logger.warning( - f"[PASS2] Agent {context.agent_id} - classification failed, continuing without" - ) + if public_position is None: + public_position = position if sentiment is not None: outcomes["sentiment"] = sentiment @@ -1189,6 +1305,7 @@ def reason_agent( return ReasoningResponse( position=position, + public_position=public_position, sentiment=sentiment, conviction=conviction_float, public_statement=public_statement, diff --git a/tests/test_engine.py b/tests/test_engine.py index 7068bb3..b04ac3b 100644 --- a/tests/test_engine.py +++ b/tests/test_engine.py @@ -298,6 +298,45 @@ def test_firm_agent_accepts_moderate_flip( assert final_state.public_position == "reject" assert final_state.position == "adopt" + def test_public_position_prefers_explicit_public_field( + self, + minimal_scenario, + simple_agents, + simple_network, + minimal_pop_spec, + tmp_path, + ): + """Public state should use response.public_position when provided.""" + config = SimulationRunConfig( + scenario_path="test.yaml", + output_dir=str(tmp_path / "output"), + ) + engine = SimulationEngine( + scenario=minimal_scenario, + population_spec=minimal_pop_spec, + agents=simple_agents, + network=simple_network, + config=config, + ) + + old_state = AgentState( + agent_id="a0", + position="undecided", + conviction=CONVICTION_MAP[ConvictionLevel.VERY_UNCERTAIN], + ) + response = _make_reasoning_response( + position="reject", + public_position="adopt", + conviction=CONVICTION_MAP[ConvictionLevel.MODERATE], + ) + + engine._process_reasoning_chunk( + timestep=1, results=[("a0", response)], old_states={"a0": old_state} + ) + + final_state = engine.state_manager.get_agent_state("a0") + assert final_state.public_position == "adopt" + class TestConvictionGatedSharing: """Test that very_uncertain agents don't share.""" @@ -1547,6 +1586,7 @@ def test_cost_in_meta_json( meta = json.load(f) assert "cost" in meta + assert meta["fidelity"] == config.fidelity cost = meta["cost"] assert cost["pivotal_input_tokens"] == 1_000_000 assert cost["pivotal_output_tokens"] == 500_000 @@ -1642,6 +1682,53 @@ def test_macro_summary_renders_waiting_momentum( text = engine._render_macro_summary(summary, prev_summary=prev_summary) assert "Most people are still watching and waiting" in text + def test_build_reasoning_context_adds_identity_threat_summary( + self, + minimal_scenario, + minimal_pop_spec, + tmp_path, + ): + minimal_scenario.event.content = ( + "The school board voted to remove books discussing race, gender identity, " + "and sexuality from public school libraries." + ) + minimal_scenario.background_context = ( + "Faith groups and parent organizations are publicly campaigning." + ) + agents = [ + { + "_id": "a0", + "first_name": "Alex", + "age": 38, + "political_orientation": "liberal", + "religious_affiliation": "catholic", + "race_ethnicity": "latino", + "gender": "male", + "dependents": [{"name": "Sam", "age": 10}], + } + ] + network = {"meta": {"node_count": 1}, "nodes": [{"id": "a0"}], "edges": []} + config = SimulationRunConfig( + scenario_path="test.yaml", + output_dir=str(tmp_path / "output"), + ) + engine = SimulationEngine( + scenario=minimal_scenario, + population_spec=minimal_pop_spec, + agents=agents, + network=network, + config=config, + ) + + state = engine.state_manager.get_agent_state("a0") + context = engine._build_reasoning_context("a0", state, timestep=1) + + assert context.identity_threat_summary is not None + assert "political orientation (liberal)" in context.identity_threat_summary + assert "religious affiliation (catholic)" in context.identity_threat_summary + assert "race/ethnicity (latino)" in context.identity_threat_summary + assert "parent/family role" in context.identity_threat_summary + def test_cost_unknown_model_returns_null_usd( self, minimal_scenario, diff --git a/tests/test_reasoning_execution.py b/tests/test_reasoning_execution.py new file mode 100644 index 0000000..496ec03 --- /dev/null +++ b/tests/test_reasoning_execution.py @@ -0,0 +1,197 @@ +"""Execution-path tests for two-pass reasoning.""" + +import asyncio +from datetime import datetime +from unittest.mock import AsyncMock, patch + +from extropy.core.llm import TokenUsage +from extropy.core.models import ExposureRecord, ReasoningContext, SimulationRunConfig +from extropy.core.models.scenario import ( + Event, + EventType, + ExposureChannel, + ExposureRule, + InteractionConfig, + InteractionType, + OutcomeConfig, + OutcomeDefinition, + OutcomeType, + ScenarioMeta, + ScenarioSpec, + SeedExposure, + SimulationConfig, + SpreadConfig, +) +from extropy.simulation.reasoning import _reason_agent_two_pass_async + + +def _make_scenario() -> ScenarioSpec: + return ScenarioSpec( + meta=ScenarioMeta( + name="test", + description="Test scenario", + population_spec="population.v1.yaml", + study_db="study.db", + population_id="default", + network_id="default", + created_at=datetime(2024, 1, 1), + ), + event=Event( + type=EventType.PRODUCT_LAUNCH, + content="A new policy is announced.", + source="City Hall", + credibility=0.9, + ambiguity=0.3, + emotional_valence=-0.1, + ), + seed_exposure=SeedExposure( + channels=[ + ExposureChannel( + name="broadcast", + description="Broadcast", + reach="broadcast", + credibility_modifier=1.0, + ) + ], + rules=[ + ExposureRule( + channel="broadcast", + timestep=0, + when="true", + probability=1.0, + ) + ], + ), + interaction=InteractionConfig( + primary_model=InteractionType.PASSIVE_OBSERVATION, + description="Observe", + ), + spread=SpreadConfig(share_probability=0.3), + outcomes=OutcomeConfig( + suggested_outcomes=[ + OutcomeDefinition( + name="adoption", + description="Primary stance", + type=OutcomeType.CATEGORICAL, + required=True, + options=["adopt", "reject", "wait"], + ) + ] + ), + simulation=SimulationConfig(max_timesteps=3), + ) + + +def _make_context() -> ReasoningContext: + return ReasoningContext( + agent_id="a0", + persona="I'm Alex, a resident in the city.", + event_content="A new policy is announced.", + exposure_history=[ + ExposureRecord( + timestep=0, + channel="broadcast", + content="A new policy is announced.", + credibility=0.9, + ) + ], + peer_opinions=[], + agent_name="Alex", + timestep=0, + timestep_unit="day", + ) + + +def test_two_pass_high_fidelity_classifies_private_and_public_positions_separately(): + scenario = _make_scenario() + context = _make_context() + config = SimulationRunConfig( + scenario_path="test.yaml", + output_dir="results", + max_retries=1, + fidelity="high", + ) + + pass1_response = { + "reasoning": "I can see both sides, but I'm conflicted.", + "private_thought": "I privately reject this policy.", + "public_statement": "Let's stay open-minded and give it a chance.", + "reasoning_summary": "I'm conflicted.", + "sentiment": -0.2, + "conviction": 62, + "will_share": True, + "actions": [], + } + + mocked_call = AsyncMock( + side_effect=[ + (pass1_response, TokenUsage(input_tokens=10, output_tokens=7)), + ({"adoption": "reject"}, TokenUsage(input_tokens=4, output_tokens=2)), + ({"adoption": "adopt"}, TokenUsage(input_tokens=5, output_tokens=3)), + ] + ) + + with patch("extropy.simulation.reasoning.simple_call_async", mocked_call): + response = asyncio.run( + _reason_agent_two_pass_async( + context=context, + scenario=scenario, + config=config, + rate_limiter=None, + ) + ) + + assert response is not None + assert response.position == "reject" + assert response.public_position == "adopt" + assert response.pass2_input_tokens == 9 + assert response.pass2_output_tokens == 5 + assert mocked_call.await_count == 3 + assert "I privately reject this policy" in mocked_call.await_args_list[1].kwargs[ + "prompt" + ] + assert "give it a chance" in mocked_call.await_args_list[2].kwargs["prompt"] + + +def test_two_pass_medium_fidelity_uses_private_classification_for_public_position(): + scenario = _make_scenario() + context = _make_context() + config = SimulationRunConfig( + scenario_path="test.yaml", + output_dir="results", + max_retries=1, + fidelity="medium", + ) + + pass1_response = { + "reasoning": "I am leaning toward waiting.", + "private_thought": "I should wait and watch.", + "public_statement": "I'm not sure yet.", + "reasoning_summary": "Leaning to wait.", + "sentiment": 0.0, + "conviction": 45, + "will_share": False, + "actions": [], + } + + mocked_call = AsyncMock( + side_effect=[ + (pass1_response, TokenUsage(input_tokens=8, output_tokens=6)), + ({"adoption": "wait"}, TokenUsage(input_tokens=3, output_tokens=2)), + ] + ) + + with patch("extropy.simulation.reasoning.simple_call_async", mocked_call): + response = asyncio.run( + _reason_agent_two_pass_async( + context=context, + scenario=scenario, + config=config, + rate_limiter=None, + ) + ) + + assert response is not None + assert response.position == "wait" + assert response.public_position == "wait" + assert mocked_call.await_count == 2 diff --git a/tests/test_reasoning_prompts.py b/tests/test_reasoning_prompts.py index e1351c3..8929a1a 100644 --- a/tests/test_reasoning_prompts.py +++ b/tests/test_reasoning_prompts.py @@ -400,6 +400,16 @@ def test_background_context_included(self): prompt = build_pass1_prompt(context, scenario) assert "The economy has been struggling" in prompt + def test_identity_relevance_included(self): + """Identity threat framing appears when context provides it.""" + context = _make_context( + identity_threat_summary="Parts of my identity feel implicated in this conflict." + ) + scenario = _make_scenario() + prompt = build_pass1_prompt(context, scenario) + assert "Identity Relevance" in prompt + assert "identity feel implicated" in prompt + def test_channel_experience_template(self): """Channel experience template replaces generic channel display.""" scenario = _make_scenario(