From 26d1df36b241f213c173dfc7ab3ae7256b7946b8 Mon Sep 17 00:00:00 2001 From: RandomOscillations Date: Tue, 17 Feb 2026 03:10:48 -0500 Subject: [PATCH 1/3] Fix #89 persist and classify THINK vs SAY divergence --- extropy/core/models/simulation.py | 4 + extropy/simulation/engine.py | 10 +- extropy/simulation/reasoning.py | 274 +++++++++++++++++++++--------- tests/test_engine.py | 40 +++++ tests/test_reasoning_execution.py | 197 +++++++++++++++++++++ 5 files changed, 437 insertions(+), 88 deletions(-) create mode 100644 tests/test_reasoning_execution.py diff --git a/extropy/core/models/simulation.py b/extropy/core/models/simulation.py index 2a3ceaa..3ac2d79 100644 --- a/extropy/core/models/simulation.py +++ b/extropy/core/models/simulation.py @@ -357,6 +357,10 @@ class ReasoningResponse(BaseModel): position: str | None = Field( default=None, description="Classified position (filled by Pass 2)" ) + public_position: str | None = Field( + default=None, + description="Public-facing position when THINK/SAY diverges (high fidelity)", + ) sentiment: float | None = Field( default=None, description="Sentiment value (-1 to 1)" ) diff --git a/extropy/simulation/engine.py b/extropy/simulation/engine.py index 850efcc..e56b0da 100644 --- a/extropy/simulation/engine.py +++ b/extropy/simulation/engine.py @@ -926,7 +926,8 @@ def _process_reasoning_chunk( public_conviction = max(0.0, min(1.0, public_conviction)) public_will_share = response.will_share - public_position = response.position + candidate_public_position = response.public_position or response.position + public_position = candidate_public_position if ( old_public_conviction is not None @@ -934,8 +935,8 @@ def _process_reasoning_chunk( ): if ( old_public_position is not None - and response.position is not None - and old_public_position != response.position + and candidate_public_position is not None + and old_public_position != candidate_public_position ): new_conviction = ( public_conviction if public_conviction is not None else 0.0 @@ -943,7 +944,7 @@ def _process_reasoning_chunk( if new_conviction < _MODERATE_CONVICTION: logger.info( f"[CONVICTION] Agent {agent_id}: public flip from {old_public_position} " - f"to {response.position} rejected (old conviction={float_to_conviction(old_public_conviction)}, " + f"to {candidate_public_position} rejected (old conviction={float_to_conviction(old_public_conviction)}, " f"new conviction={float_to_conviction(public_conviction)})" ) public_position = old_public_position @@ -2041,6 +2042,7 @@ def _export_results(self) -> None: "population_size": len(self.agents), "strong_model": self.config.strong, "fast_model": self.config.fast, + "fidelity": self.config.fidelity, "seed": self.seed, "multi_touch_threshold": self.config.multi_touch_threshold, "completed_at": datetime.now().isoformat(), diff --git a/extropy/simulation/reasoning.py b/extropy/simulation/reasoning.py index 1cba318..4977078 100644 --- a/extropy/simulation/reasoning.py +++ b/extropy/simulation/reasoning.py @@ -754,6 +754,107 @@ def _sentiment_to_tone(sentiment: float) -> str: return "strongly opposed" +async def _classify_text_async( + text: str, + scenario: ScenarioSpec, + pass2_schema: dict[str, Any], + classify_model: str | None, + config: SimulationRunConfig, + context: ReasoningContext, + rate_limiter: Any = None, +) -> tuple[dict[str, Any], TokenUsage]: + """Classify free text into structured outcomes using Pass 2 schema.""" + if not text.strip(): + return {}, TokenUsage() + + pass2_prompt = build_pass2_prompt(text, scenario) + usage = TokenUsage() + + for attempt in range(config.max_retries): + try: + if rate_limiter: + estimated_input = len(pass2_prompt) // 4 + estimated_output = 80 + await rate_limiter.routine.acquire( + estimated_input_tokens=estimated_input, + estimated_output_tokens=estimated_output, + ) + + call_start = time.time() + pass2_response, usage = await asyncio.wait_for( + simple_call_async( + prompt=pass2_prompt, + response_schema=pass2_schema, + schema_name="classification", + model=classify_model, + ), + timeout=20.0, + ) + call_elapsed = time.time() - call_start + logger.info(f"[PASS2] Agent {context.agent_id} - {call_elapsed:.2f}s") + + if pass2_response: + return dict(pass2_response), usage + except asyncio.TimeoutError: + logger.warning( + f"[PASS2] Agent {context.agent_id} - attempt {attempt + 1} timed out after 20s" + ) + if attempt == config.max_retries - 1: + logger.warning( + f"[PASS2] Agent {context.agent_id} - all retries exhausted, proceeding without classification" + ) + except Exception as e: + logger.warning( + f"[PASS2] Agent {context.agent_id} - attempt {attempt + 1} failed: {e}" + ) + if attempt == config.max_retries - 1: + logger.warning( + f"[PASS2] Agent {context.agent_id} - all retries exhausted, proceeding without classification" + ) + + return {}, usage + + +def _classify_text_sync( + text: str, + scenario: ScenarioSpec, + pass2_schema: dict[str, Any], + classify_model: str | None, + config: SimulationRunConfig, + context: ReasoningContext, +) -> dict[str, Any]: + """Synchronous counterpart for classifying text into outcomes.""" + if not text.strip(): + return {} + + pass2_prompt = build_pass2_prompt(text, scenario) + for attempt in range(config.max_retries): + try: + call_start = time.time() + pass2_response = simple_call( + prompt=pass2_prompt, + response_schema=pass2_schema, + schema_name="classification", + model=classify_model, + log=True, + ) + call_elapsed = time.time() - call_start + logger.info(f"[PASS2] Agent {context.agent_id} - API call took {call_elapsed:.2f}s") + + if pass2_response: + return dict(pass2_response) + except Exception as e: + logger.warning( + f"[PASS2] Agent {context.agent_id} - attempt {attempt + 1} failed: {e}" + ) + if attempt == config.max_retries - 1: + logger.warning( + f"[PASS2] Agent {context.agent_id} - classification failed, continuing without" + ) + + return {} + + # ============================================================================= # Two-pass reasoning (async) # ============================================================================= @@ -836,6 +937,7 @@ async def _reason_agent_two_pass_async( # Extract Pass 1 fields reasoning = pass1_response.get("reasoning", "") + private_thought = pass1_response.get("private_thought", "") public_statement = pass1_response.get("public_statement", "") reasoning_summary = pass1_response.get("reasoning_summary", "") sentiment = pass1_response.get("sentiment") @@ -851,60 +953,40 @@ async def _reason_agent_two_pass_async( # === Pass 2: Classification (if needed) === pass2_schema = build_pass2_schema(scenario.outcomes) position = None + public_position = None outcomes = {} - pass2_usage = TokenUsage() + pass2_usage_private = TokenUsage() + pass2_usage_public = TokenUsage() if pass2_schema: - pass2_prompt = build_pass2_prompt(reasoning, scenario) - - for attempt in range(config.max_retries): - try: - if rate_limiter: - # Dynamic token estimate from prompt length - estimated_input = len(pass2_prompt) // 4 - estimated_output = 80 # classification is small - await rate_limiter.routine.acquire( - estimated_input_tokens=estimated_input, - estimated_output_tokens=estimated_output, - ) - - call_start = time.time() - pass2_response, pass2_usage = await asyncio.wait_for( - simple_call_async( - prompt=pass2_prompt, - response_schema=pass2_schema, - schema_name="classification", - model=classify_model, - ), - timeout=20.0, - ) - call_elapsed = time.time() - call_start - - logger.info(f"[PASS2] Agent {context.agent_id} - {call_elapsed:.2f}s") + private_text = private_thought or reasoning + outcomes, pass2_usage_private = await _classify_text_async( + text=private_text, + scenario=scenario, + pass2_schema=pass2_schema, + classify_model=classify_model, + config=config, + context=context, + rate_limiter=rate_limiter, + ) + if position_outcome and position_outcome in outcomes: + position = outcomes[position_outcome] + + if config.fidelity == "high": + public_outcomes, pass2_usage_public = await _classify_text_async( + text=public_statement, + scenario=scenario, + pass2_schema=pass2_schema, + classify_model=classify_model, + config=config, + context=context, + rate_limiter=rate_limiter, + ) + if position_outcome and position_outcome in public_outcomes: + public_position = public_outcomes[position_outcome] - if pass2_response: - outcomes = dict(pass2_response) - # Extract primary position from outcomes - if position_outcome and position_outcome in pass2_response: - position = pass2_response[position_outcome] - break - except asyncio.TimeoutError: - logger.warning( - f"[PASS2] Agent {context.agent_id} - attempt {attempt + 1} timed out after 20s" - ) - if attempt == config.max_retries - 1: - logger.warning( - f"[PASS2] Agent {context.agent_id} - all retries exhausted, proceeding without classification" - ) - except Exception as e: - logger.warning( - f"[PASS2] Agent {context.agent_id} - attempt {attempt + 1} failed: {e}" - ) - if attempt == config.max_retries - 1: - # Pass 2 failure is non-fatal — we still have Pass 1 data - logger.warning( - f"[PASS2] Agent {context.agent_id} - all retries exhausted, proceeding without classification" - ) + if public_position is None: + public_position = position # Merge sentiment into outcomes for backwards compat if sentiment is not None: @@ -912,6 +994,7 @@ async def _reason_agent_two_pass_async( return ReasoningResponse( position=position, + public_position=public_position, sentiment=sentiment, conviction=conviction_float, public_statement=public_statement, @@ -923,8 +1006,12 @@ async def _reason_agent_two_pass_async( actions=actions, pass1_input_tokens=pass1_usage.input_tokens, pass1_output_tokens=pass1_usage.output_tokens, - pass2_input_tokens=pass2_usage.input_tokens, - pass2_output_tokens=pass2_usage.output_tokens, + pass2_input_tokens=( + pass2_usage_private.input_tokens + pass2_usage_public.input_tokens + ), + pass2_output_tokens=( + pass2_usage_private.output_tokens + pass2_usage_public.output_tokens + ), ) @@ -1019,8 +1106,10 @@ async def _reason_agent_merged_async( # Extract position from outcomes position = None + public_position = None if position_outcome and position_outcome in response: position = response[position_outcome] + public_position = position # Build outcomes dict (everything except the Pass 1 fields) pass1_fields = { @@ -1038,8 +1127,27 @@ async def _reason_agent_merged_async( if sentiment is not None: outcomes["sentiment"] = sentiment + pass2_usage_public = TokenUsage() + pass2_schema = build_pass2_schema(scenario.outcomes) + if config.fidelity == "high" and pass2_schema: + public_outcomes, pass2_usage_public = await _classify_text_async( + text=public_statement, + scenario=scenario, + pass2_schema=pass2_schema, + classify_model=config.fast or None, + config=config, + context=context, + rate_limiter=rate_limiter, + ) + if position_outcome and position_outcome in public_outcomes: + public_position = public_outcomes[position_outcome] + + if public_position is None: + public_position = position + return ReasoningResponse( position=position, + public_position=public_position, sentiment=sentiment, conviction=conviction_float, public_statement=public_statement, @@ -1051,8 +1159,8 @@ async def _reason_agent_merged_async( actions=actions, pass1_input_tokens=usage.input_tokens, pass1_output_tokens=usage.output_tokens, - pass2_input_tokens=0, - pass2_output_tokens=0, + pass2_input_tokens=pass2_usage_public.input_tokens, + pass2_output_tokens=pass2_usage_public.output_tokens, ) @@ -1133,6 +1241,7 @@ def reason_agent( # Extract Pass 1 fields reasoning = pass1_response.get("reasoning", "") + private_thought = pass1_response.get("private_thought", "") public_statement = pass1_response.get("public_statement", "") reasoning_summary = pass1_response.get("reasoning_summary", "") sentiment = pass1_response.get("sentiment") @@ -1143,41 +1252,37 @@ def reason_agent( # === Pass 2: Classification === pass2_schema = build_pass2_schema(scenario.outcomes) position = None + public_position = None outcomes = {} if pass2_schema: - pass2_prompt = build_pass2_prompt(reasoning, scenario) + private_text = private_thought or reasoning classify_model = config.fast or None + outcomes = _classify_text_sync( + text=private_text, + scenario=scenario, + pass2_schema=pass2_schema, + classify_model=classify_model, + config=config, + context=context, + ) + if position_outcome and position_outcome in outcomes: + position = outcomes[position_outcome] + + if config.fidelity == "high": + public_outcomes = _classify_text_sync( + text=public_statement, + scenario=scenario, + pass2_schema=pass2_schema, + classify_model=classify_model, + config=config, + context=context, + ) + if position_outcome and position_outcome in public_outcomes: + public_position = public_outcomes[position_outcome] - for attempt in range(config.max_retries): - try: - call_start = time.time() - pass2_response = simple_call( - prompt=pass2_prompt, - response_schema=pass2_schema, - schema_name="classification", - model=classify_model, - log=True, - ) - call_elapsed = time.time() - call_start - - logger.info( - f"[PASS2] Agent {context.agent_id} - API call took {call_elapsed:.2f}s" - ) - - if pass2_response: - outcomes = dict(pass2_response) - if position_outcome and position_outcome in pass2_response: - position = pass2_response[position_outcome] - break - except Exception as e: - logger.warning( - f"[PASS2] Agent {context.agent_id} - attempt {attempt + 1} failed: {e}" - ) - if attempt == config.max_retries - 1: - logger.warning( - f"[PASS2] Agent {context.agent_id} - classification failed, continuing without" - ) + if public_position is None: + public_position = position if sentiment is not None: outcomes["sentiment"] = sentiment @@ -1189,6 +1294,7 @@ def reason_agent( return ReasoningResponse( position=position, + public_position=public_position, sentiment=sentiment, conviction=conviction_float, public_statement=public_statement, diff --git a/tests/test_engine.py b/tests/test_engine.py index 7068bb3..796571c 100644 --- a/tests/test_engine.py +++ b/tests/test_engine.py @@ -298,6 +298,45 @@ def test_firm_agent_accepts_moderate_flip( assert final_state.public_position == "reject" assert final_state.position == "adopt" + def test_public_position_prefers_explicit_public_field( + self, + minimal_scenario, + simple_agents, + simple_network, + minimal_pop_spec, + tmp_path, + ): + """Public state should use response.public_position when provided.""" + config = SimulationRunConfig( + scenario_path="test.yaml", + output_dir=str(tmp_path / "output"), + ) + engine = SimulationEngine( + scenario=minimal_scenario, + population_spec=minimal_pop_spec, + agents=simple_agents, + network=simple_network, + config=config, + ) + + old_state = AgentState( + agent_id="a0", + position="undecided", + conviction=CONVICTION_MAP[ConvictionLevel.VERY_UNCERTAIN], + ) + response = _make_reasoning_response( + position="reject", + public_position="adopt", + conviction=CONVICTION_MAP[ConvictionLevel.MODERATE], + ) + + engine._process_reasoning_chunk( + timestep=1, results=[("a0", response)], old_states={"a0": old_state} + ) + + final_state = engine.state_manager.get_agent_state("a0") + assert final_state.public_position == "adopt" + class TestConvictionGatedSharing: """Test that very_uncertain agents don't share.""" @@ -1547,6 +1586,7 @@ def test_cost_in_meta_json( meta = json.load(f) assert "cost" in meta + assert meta["fidelity"] == config.fidelity cost = meta["cost"] assert cost["pivotal_input_tokens"] == 1_000_000 assert cost["pivotal_output_tokens"] == 500_000 diff --git a/tests/test_reasoning_execution.py b/tests/test_reasoning_execution.py new file mode 100644 index 0000000..496ec03 --- /dev/null +++ b/tests/test_reasoning_execution.py @@ -0,0 +1,197 @@ +"""Execution-path tests for two-pass reasoning.""" + +import asyncio +from datetime import datetime +from unittest.mock import AsyncMock, patch + +from extropy.core.llm import TokenUsage +from extropy.core.models import ExposureRecord, ReasoningContext, SimulationRunConfig +from extropy.core.models.scenario import ( + Event, + EventType, + ExposureChannel, + ExposureRule, + InteractionConfig, + InteractionType, + OutcomeConfig, + OutcomeDefinition, + OutcomeType, + ScenarioMeta, + ScenarioSpec, + SeedExposure, + SimulationConfig, + SpreadConfig, +) +from extropy.simulation.reasoning import _reason_agent_two_pass_async + + +def _make_scenario() -> ScenarioSpec: + return ScenarioSpec( + meta=ScenarioMeta( + name="test", + description="Test scenario", + population_spec="population.v1.yaml", + study_db="study.db", + population_id="default", + network_id="default", + created_at=datetime(2024, 1, 1), + ), + event=Event( + type=EventType.PRODUCT_LAUNCH, + content="A new policy is announced.", + source="City Hall", + credibility=0.9, + ambiguity=0.3, + emotional_valence=-0.1, + ), + seed_exposure=SeedExposure( + channels=[ + ExposureChannel( + name="broadcast", + description="Broadcast", + reach="broadcast", + credibility_modifier=1.0, + ) + ], + rules=[ + ExposureRule( + channel="broadcast", + timestep=0, + when="true", + probability=1.0, + ) + ], + ), + interaction=InteractionConfig( + primary_model=InteractionType.PASSIVE_OBSERVATION, + description="Observe", + ), + spread=SpreadConfig(share_probability=0.3), + outcomes=OutcomeConfig( + suggested_outcomes=[ + OutcomeDefinition( + name="adoption", + description="Primary stance", + type=OutcomeType.CATEGORICAL, + required=True, + options=["adopt", "reject", "wait"], + ) + ] + ), + simulation=SimulationConfig(max_timesteps=3), + ) + + +def _make_context() -> ReasoningContext: + return ReasoningContext( + agent_id="a0", + persona="I'm Alex, a resident in the city.", + event_content="A new policy is announced.", + exposure_history=[ + ExposureRecord( + timestep=0, + channel="broadcast", + content="A new policy is announced.", + credibility=0.9, + ) + ], + peer_opinions=[], + agent_name="Alex", + timestep=0, + timestep_unit="day", + ) + + +def test_two_pass_high_fidelity_classifies_private_and_public_positions_separately(): + scenario = _make_scenario() + context = _make_context() + config = SimulationRunConfig( + scenario_path="test.yaml", + output_dir="results", + max_retries=1, + fidelity="high", + ) + + pass1_response = { + "reasoning": "I can see both sides, but I'm conflicted.", + "private_thought": "I privately reject this policy.", + "public_statement": "Let's stay open-minded and give it a chance.", + "reasoning_summary": "I'm conflicted.", + "sentiment": -0.2, + "conviction": 62, + "will_share": True, + "actions": [], + } + + mocked_call = AsyncMock( + side_effect=[ + (pass1_response, TokenUsage(input_tokens=10, output_tokens=7)), + ({"adoption": "reject"}, TokenUsage(input_tokens=4, output_tokens=2)), + ({"adoption": "adopt"}, TokenUsage(input_tokens=5, output_tokens=3)), + ] + ) + + with patch("extropy.simulation.reasoning.simple_call_async", mocked_call): + response = asyncio.run( + _reason_agent_two_pass_async( + context=context, + scenario=scenario, + config=config, + rate_limiter=None, + ) + ) + + assert response is not None + assert response.position == "reject" + assert response.public_position == "adopt" + assert response.pass2_input_tokens == 9 + assert response.pass2_output_tokens == 5 + assert mocked_call.await_count == 3 + assert "I privately reject this policy" in mocked_call.await_args_list[1].kwargs[ + "prompt" + ] + assert "give it a chance" in mocked_call.await_args_list[2].kwargs["prompt"] + + +def test_two_pass_medium_fidelity_uses_private_classification_for_public_position(): + scenario = _make_scenario() + context = _make_context() + config = SimulationRunConfig( + scenario_path="test.yaml", + output_dir="results", + max_retries=1, + fidelity="medium", + ) + + pass1_response = { + "reasoning": "I am leaning toward waiting.", + "private_thought": "I should wait and watch.", + "public_statement": "I'm not sure yet.", + "reasoning_summary": "Leaning to wait.", + "sentiment": 0.0, + "conviction": 45, + "will_share": False, + "actions": [], + } + + mocked_call = AsyncMock( + side_effect=[ + (pass1_response, TokenUsage(input_tokens=8, output_tokens=6)), + ({"adoption": "wait"}, TokenUsage(input_tokens=3, output_tokens=2)), + ] + ) + + with patch("extropy.simulation.reasoning.simple_call_async", mocked_call): + response = asyncio.run( + _reason_agent_two_pass_async( + context=context, + scenario=scenario, + config=config, + rate_limiter=None, + ) + ) + + assert response is not None + assert response.position == "wait" + assert response.public_position == "wait" + assert mocked_call.await_count == 2 From 1563e0d030746995f5c7781773f7a97311c3364a Mon Sep 17 00:00:00 2001 From: RandomOscillations Date: Tue, 17 Feb 2026 03:12:12 -0500 Subject: [PATCH 2/3] Fix #90 add identity-threat framing to reasoning context --- extropy/core/models/simulation.py | 4 + extropy/simulation/engine.py | 192 ++++++++++++++++++++++++++++++ extropy/simulation/reasoning.py | 11 ++ tests/test_engine.py | 47 ++++++++ tests/test_reasoning_prompts.py | 10 ++ 5 files changed, 264 insertions(+) diff --git a/extropy/core/models/simulation.py b/extropy/core/models/simulation.py index 3ac2d79..0173358 100644 --- a/extropy/core/models/simulation.py +++ b/extropy/core/models/simulation.py @@ -310,6 +310,10 @@ class ReasoningContext(BaseModel): background_context: str | None = Field( default=None, description="Scenario-level background context" ) + identity_threat_summary: str | None = Field( + default=None, + description="Deterministic identity-relevance framing when scenario content threatens group identity", + ) agent_names: dict[str, str] = Field( default_factory=dict, description="Mapping of agent_id → first name for resolving peer references", diff --git a/extropy/simulation/engine.py b/extropy/simulation/engine.py index e56b0da..ddfca6e 100644 --- a/extropy/simulation/engine.py +++ b/extropy/simulation/engine.py @@ -84,6 +84,7 @@ _BOUNDED_CONFIDENCE_RHO = 0.35 _PRIVATE_ADJUSTMENT_RHO = 0.12 _PRIVATE_FLIP_CONVICTION = CONVICTION_MAP[ConvictionLevel.FIRM] +_IDENTITY_VALUE_SENTINELS = {"", "unknown", "none", "n/a", "na", "null"} class _StateTimelineAdapter: @@ -1448,6 +1449,9 @@ def _build_reasoning_context( ctx.macro_summary = macro_summary ctx.local_mood_summary = local_mood_summary ctx.background_context = self.scenario.background_context + ctx.identity_threat_summary = self._render_identity_threat_context( + agent, timestep + ) ctx.agent_names = self._agent_names # Populate Phase C fields @@ -1477,6 +1481,194 @@ def _build_reasoning_context( return ctx + @staticmethod + def _identity_value(agent: dict[str, Any], keys: tuple[str, ...]) -> str | None: + """Return the first meaningful identity value from candidate attribute keys.""" + for key in keys: + value = agent.get(key) + if value is None: + continue + if isinstance(value, bool): + if value: + return "yes" + continue + if isinstance(value, (list, tuple, set)): + if not value: + continue + return ", ".join(str(v) for v in value) + text = str(value).strip() + if text.lower() in _IDENTITY_VALUE_SENTINELS: + continue + return text.replace("_", " ") + return None + + def _render_identity_threat_context( + self, + agent: dict[str, Any], + timestep: int, + ) -> str | None: + """Render deterministic identity-relevance framing from scenario + agent data.""" + corpus_parts = [ + self.scenario.meta.description, + self.scenario.event.content, + self.scenario.background_context, + ] + if self.scenario.timeline: + for timeline_event in self.scenario.timeline: + if timeline_event.timestep <= timestep: + corpus_parts.append(timeline_event.event.content) + if timeline_event.description: + corpus_parts.append(timeline_event.description) + corpus = " ".join(str(part).lower() for part in corpus_parts if part).strip() + if not corpus: + return None + + def scenario_mentions(keywords: tuple[str, ...]) -> bool: + return any(keyword in corpus for keyword in keywords) + + identity_dimensions: list[str] = [] + + political_value = self._identity_value( + agent, + ("political_orientation", "political_ideology", "party_affiliation"), + ) + if political_value and scenario_mentions( + ( + "liberal", + "conservative", + "left", + "right", + "republican", + "democrat", + "politic", + "ideolog", + "culture war", + "censorship", + "book ban", + "school board", + " ban ", + ) + ): + identity_dimensions.append(f"political orientation ({political_value})") + + religious_value = self._identity_value( + agent, ("religious_affiliation", "religion", "faith_tradition") + ) + if religious_value and scenario_mentions( + ( + "religio", + "faith", + "church", + "mosque", + "temple", + "christian", + "muslim", + "jewish", + "moral", + ) + ): + identity_dimensions.append(f"religious affiliation ({religious_value})") + + race_value = self._identity_value(agent, ("race_ethnicity", "race", "ethnicity")) + if race_value and scenario_mentions( + ( + "race", + "racial", + "ethnic", + "minority", + "majority", + "immigra", + "inclusion", + "diversity", + "equity", + "civil rights", + "discrimination", + ) + ): + identity_dimensions.append(f"race/ethnicity ({race_value})") + + gender_value = self._identity_value( + agent, + ("gender_identity", "gender", "sexual_orientation"), + ) + if gender_value and scenario_mentions( + ( + "gender", + "women", + "woman", + "men", + "man", + "lgbt", + "lgbtq", + "queer", + "gay", + "lesbian", + "trans", + "transgender", + "sexual orientation", + "sexuality", + "pronoun", + ) + ): + identity_dimensions.append(f"gender/sexual identity ({gender_value})") + + parental_value = self._identity_value( + agent, ("parental_status", "household_role", "family_role") + ) + has_dependents = bool(agent.get("dependents")) + has_children_flag = bool(agent.get("has_children")) + if ( + scenario_mentions( + ( + "parent", + "parents", + "kid", + "kids", + "children", + "school", + "curriculum", + "classroom", + "book", + "library", + "parental rights", + ) + ) + and (parental_value or has_dependents or has_children_flag) + ): + if parental_value: + identity_dimensions.append(f"parent/family role ({parental_value})") + elif has_dependents: + identity_dimensions.append("parent/family role (caregiver)") + else: + identity_dimensions.append("parent/family role") + + citizenship_value = self._identity_value( + agent, ("citizenship_status", "nationality", "country_of_origin") + ) + if citizenship_value and scenario_mentions( + ( + "immigration", + "immigrant", + "border", + "citizenship", + "deport", + "national identity", + "patriot", + ) + ): + identity_dimensions.append(f"citizenship/national identity ({citizenship_value})") + + if not identity_dimensions: + return None + + framed_dimensions = "; ".join(identity_dimensions) + return ( + "This development can feel identity-relevant, not just practical. " + f"Parts of who I am that may feel implicated: {framed_dimensions}. " + "If it feels personal, acknowledge that in both your internal reaction " + "and what you choose to say publicly." + ) + def _get_peer_opinions(self, agent_id: str) -> list[PeerOpinion]: """Get opinions of connected peers who have visibly shared. diff --git a/extropy/simulation/reasoning.py b/extropy/simulation/reasoning.py index 4977078..7e0c293 100644 --- a/extropy/simulation/reasoning.py +++ b/extropy/simulation/reasoning.py @@ -117,6 +117,17 @@ def build_pass1_prompt( ] ) + # --- Identity relevance framing --- + if context.identity_threat_summary: + prompt_parts.extend( + [ + "## Identity Relevance", + "", + context.identity_threat_summary, + "", + ] + ) + # --- Exposure history (named + experiential) --- prompt_parts.extend(["## How This Reached You", ""]) diff --git a/tests/test_engine.py b/tests/test_engine.py index 796571c..b04ac3b 100644 --- a/tests/test_engine.py +++ b/tests/test_engine.py @@ -1682,6 +1682,53 @@ def test_macro_summary_renders_waiting_momentum( text = engine._render_macro_summary(summary, prev_summary=prev_summary) assert "Most people are still watching and waiting" in text + def test_build_reasoning_context_adds_identity_threat_summary( + self, + minimal_scenario, + minimal_pop_spec, + tmp_path, + ): + minimal_scenario.event.content = ( + "The school board voted to remove books discussing race, gender identity, " + "and sexuality from public school libraries." + ) + minimal_scenario.background_context = ( + "Faith groups and parent organizations are publicly campaigning." + ) + agents = [ + { + "_id": "a0", + "first_name": "Alex", + "age": 38, + "political_orientation": "liberal", + "religious_affiliation": "catholic", + "race_ethnicity": "latino", + "gender": "male", + "dependents": [{"name": "Sam", "age": 10}], + } + ] + network = {"meta": {"node_count": 1}, "nodes": [{"id": "a0"}], "edges": []} + config = SimulationRunConfig( + scenario_path="test.yaml", + output_dir=str(tmp_path / "output"), + ) + engine = SimulationEngine( + scenario=minimal_scenario, + population_spec=minimal_pop_spec, + agents=agents, + network=network, + config=config, + ) + + state = engine.state_manager.get_agent_state("a0") + context = engine._build_reasoning_context("a0", state, timestep=1) + + assert context.identity_threat_summary is not None + assert "political orientation (liberal)" in context.identity_threat_summary + assert "religious affiliation (catholic)" in context.identity_threat_summary + assert "race/ethnicity (latino)" in context.identity_threat_summary + assert "parent/family role" in context.identity_threat_summary + def test_cost_unknown_model_returns_null_usd( self, minimal_scenario, diff --git a/tests/test_reasoning_prompts.py b/tests/test_reasoning_prompts.py index e1351c3..8929a1a 100644 --- a/tests/test_reasoning_prompts.py +++ b/tests/test_reasoning_prompts.py @@ -400,6 +400,16 @@ def test_background_context_included(self): prompt = build_pass1_prompt(context, scenario) assert "The economy has been struggling" in prompt + def test_identity_relevance_included(self): + """Identity threat framing appears when context provides it.""" + context = _make_context( + identity_threat_summary="Parts of my identity feel implicated in this conflict." + ) + scenario = _make_scenario() + prompt = build_pass1_prompt(context, scenario) + assert "Identity Relevance" in prompt + assert "identity feel implicated" in prompt + def test_channel_experience_template(self): """Channel experience template replaces generic channel display.""" scenario = _make_scenario( From 35491d15d79b75ff980f01a2f8ab70d61770fb14 Mon Sep 17 00:00:00 2001 From: RandomOscillations Date: Tue, 17 Feb 2026 03:13:07 -0500 Subject: [PATCH 3/3] Fix #82 standardize logging and redact provider payloads --- docs/logging-guidelines.md | 25 +++++ extropy/core/providers/logging.py | 113 +++++++++++++++++--- extropy/population/network/metrics.py | 29 ++--- extropy/population/spec_builder/hydrator.py | 6 +- extropy/simulation/reasoning.py | 4 +- tests/test_logging.py | 69 ++++++++++++ 6 files changed, 213 insertions(+), 33 deletions(-) create mode 100644 docs/logging-guidelines.md create mode 100644 tests/test_logging.py diff --git a/docs/logging-guidelines.md b/docs/logging-guidelines.md new file mode 100644 index 0000000..fa6ecdc --- /dev/null +++ b/docs/logging-guidelines.md @@ -0,0 +1,25 @@ +# Logging Guidelines + +These rules keep logging useful without leaking sensitive data. + +## Principles + +- Use module loggers: `logger = logging.getLogger(__name__)`. +- Keep user-facing CLI output in `console.print(...)`; use `logger.*` for diagnostics. +- Never log raw prompts, private reasoning text, API keys, auth headers, or tokens. +- Prefer structured metadata in logs (model name, token counts, timings, IDs). +- Use levels consistently: + - `DEBUG`: high-volume diagnostics safe for local troubleshooting. + - `INFO`: normal progress and timing. + - `WARNING`: retries, degraded behavior, recoverable failures. + - `ERROR`: terminal failures for the current operation. + +## Provider Debug Logs + +`extropy/core/providers/logging.py` writes sanitized JSON logs: + +- Secret fields are replaced with `[REDACTED_SECRET]`. +- Prompt/content-like text fields are replaced with `[REDACTED_TEXT length=N]`. +- Responses are summarized and sanitized before writing. + +If you add new request/response fields, ensure they pass through the sanitizer. diff --git a/extropy/core/providers/logging.py b/extropy/core/providers/logging.py index 1a991bf..28b742e 100644 --- a/extropy/core/providers/logging.py +++ b/extropy/core/providers/logging.py @@ -1,10 +1,34 @@ """Shared logging helpers for LLM providers.""" import json +import logging from datetime import datetime from pathlib import Path from typing import Any +logger = logging.getLogger(__name__) + +_SECRET_KEY_MARKERS = ("api_key", "authorization", "token", "secret", "password") +_TOKEN_COUNT_KEYS = { + "prompt_tokens", + "completion_tokens", + "total_tokens", + "input_tokens", + "output_tokens", +} +_TEXT_KEY_MARKERS = ( + "prompt", + "content", + "input", + "output", + "text", + "message", + "reasoning", + "statement", + "thought", + "elaboration", +) + def get_logs_dir() -> Path: """Get logs directory, create if needed.""" @@ -13,6 +37,71 @@ def get_logs_dir() -> Path: return logs_dir +def _sanitize_for_logs(value: Any, key_hint: str = "") -> Any: + """Recursively sanitize payloads before persisting debug logs.""" + key = key_hint.lower() + if key in _TOKEN_COUNT_KEYS: + return value + if any(marker in key for marker in _SECRET_KEY_MARKERS): + return "[REDACTED_SECRET]" + + if isinstance(value, dict): + return { + str(k): _sanitize_for_logs(v, key_hint=str(k)) + for k, v in value.items() + } + + if isinstance(value, list): + return [_sanitize_for_logs(item, key_hint=key_hint) for item in value] + + if isinstance(value, tuple): + return [_sanitize_for_logs(item, key_hint=key_hint) for item in value] + + if isinstance(value, str): + if any(marker in key for marker in _TEXT_KEY_MARKERS): + return f"[REDACTED_TEXT length={len(value)}]" + if len(value) > 200: + return value[:200] + "...[truncated]" + return value + + return value + + +def _serialize_response(response: Any) -> Any: + """Convert provider response to a serializable, sanitized structure.""" + if isinstance(response, dict): + return _sanitize_for_logs(response, key_hint="response") + + if hasattr(response, "model_dump"): + try: + dumped = response.model_dump(mode="json", warnings=False) + return _sanitize_for_logs(dumped, key_hint="response") + except Exception: + pass + + usage = getattr(response, "usage", None) + usage_dict = None + if usage is not None: + usage_dict = _sanitize_for_logs( + getattr(usage, "__dict__", str(usage)), + key_hint="usage", + ) + + summary = { + "type": type(response).__name__, + } + model_name = getattr(response, "model", None) + if model_name: + summary["model"] = model_name + response_id = getattr(response, "id", None) + if response_id: + summary["id"] = response_id + if usage_dict is not None: + summary["usage"] = usage_dict + + return summary + + def log_request_response( function_name: str, request: dict, @@ -20,34 +109,26 @@ def log_request_response( provider: str = "", sources: list[str] | None = None, ) -> None: - """Log full request and response to a JSON file.""" + """Log sanitized request/response metadata to a JSON file.""" logs_dir = get_logs_dir() timestamp = datetime.now().strftime("%Y%m%d_%H%M%S_%f") prefix = f"{provider}_" if provider else "" log_file = logs_dir / f"{timestamp}_{prefix}{function_name}.json" - response_dict = None - if hasattr(response, "model_dump"): - try: - response_dict = response.model_dump(mode="json", warnings=False) - except Exception: - response_dict = str(response) - elif hasattr(response, "__dict__"): - response_dict = str(response) - else: - response_dict = str(response) - log_data = { "timestamp": datetime.now().isoformat(), "function": function_name, "provider": provider, - "request": request, - "response": response_dict, + "request": _sanitize_for_logs(request, key_hint="request"), + "response": _serialize_response(response), "sources_extracted": sources or [], } - with open(log_file, "w") as f: - json.dump(log_data, f, indent=2, default=str) + try: + with open(log_file, "w", encoding="utf-8") as f: + json.dump(log_data, f, indent=2, default=str) + except Exception as exc: + logger.warning("Failed to write provider debug log %s: %s", log_file, exc) def extract_error_summary(error_msg: str) -> str: diff --git a/extropy/population/network/metrics.py b/extropy/population/network/metrics.py index b477ece..dc39973 100644 --- a/extropy/population/network/metrics.py +++ b/extropy/population/network/metrics.py @@ -5,11 +5,14 @@ - Node metrics: PageRank, betweenness, cluster ID, echo chamber score """ +import logging from collections import defaultdict from typing import Any from ...core.models import NetworkMetrics, NodeMetrics +logger = logging.getLogger(__name__) + try: import networkx as nx @@ -203,7 +206,7 @@ def validate_network( Args: edges: List of edge dictionaries agent_ids: List of agent IDs - verbose: If True, print detailed metrics + verbose: If True, log detailed metrics Returns: Tuple of (is_valid, metrics, warnings) @@ -212,25 +215,25 @@ def validate_network( is_valid, warnings = metrics.is_valid() if verbose: - print("Network Validation Report:") - print(f" Nodes: {metrics.node_count}") - print(f" Edges: {metrics.edge_count}") - print(f" Avg Degree: {metrics.avg_degree:.2f}") - print(f" Clustering: {metrics.clustering_coefficient:.3f}") - print( + logger.info("Network Validation Report:") + logger.info(" Nodes: %s", metrics.node_count) + logger.info(" Edges: %s", metrics.edge_count) + logger.info(" Avg Degree: %.2f", metrics.avg_degree) + logger.info(" Clustering: %.3f", metrics.clustering_coefficient) + logger.info( f" Avg Path Length: {metrics.avg_path_length:.2f}" if metrics.avg_path_length else " Avg Path Length: N/A (disconnected)" ) - print(f" Modularity: {metrics.modularity:.3f}") - print(f" Largest Component: {metrics.largest_component_ratio:.1%}") - print(f" Degree Assortativity: {metrics.degree_assortativity:.3f}") + logger.info(" Modularity: %.3f", metrics.modularity) + logger.info(" Largest Component: %.1f%%", metrics.largest_component_ratio * 100) + logger.info(" Degree Assortativity: %.3f", metrics.degree_assortativity) if warnings: - print("\nWarnings:") + logger.info("Warnings:") for w in warnings: - print(f" - {w}") + logger.info(" - %s", w) else: - print("\nAll metrics within expected ranges.") + logger.info("All metrics within expected ranges.") return is_valid, metrics, warnings diff --git a/extropy/population/spec_builder/hydrator.py b/extropy/population/spec_builder/hydrator.py index b75461a..226207a 100644 --- a/extropy/population/spec_builder/hydrator.py +++ b/extropy/population/spec_builder/hydrator.py @@ -163,14 +163,14 @@ def hydrate_attributes( population = description def report(step: str, status: str, count: int | None = None): - """Report progress via callback or print.""" + """Report progress via callback or logger.""" if on_progress: on_progress(step, status, count) else: if count is not None: - print(f" {step}: {status} ({count})") + logger.info(" %s: %s (%s)", step, status, count) else: - print(f" {step}: {status}") + logger.info(" %s: %s", step, status) def make_retry_callback(step: str) -> RetryCallback: """Create a retry callback for a specific step.""" diff --git a/extropy/simulation/reasoning.py b/extropy/simulation/reasoning.py index 7e0c293..80bc41b 100644 --- a/extropy/simulation/reasoning.py +++ b/extropy/simulation/reasoning.py @@ -1205,7 +1205,9 @@ def reason_agent( f"[REASON] Agent {context.agent_id} - prompt length: {len(pass1_prompt)} chars" ) logger.debug( - f"[REASON] Agent {context.agent_id} - PROMPT:\n{pass1_prompt[:500]}..." + "[REASON] Agent %s - prompt redacted (length=%s chars)", + context.agent_id, + len(pass1_prompt), ) # === Pass 1: Role-play === diff --git a/tests/test_logging.py b/tests/test_logging.py new file mode 100644 index 0000000..d5f489e --- /dev/null +++ b/tests/test_logging.py @@ -0,0 +1,69 @@ +"""Tests for logging hygiene and redaction.""" + +import json + +import pytest + +from extropy.core.providers import logging as provider_logging +from extropy.population.network import metrics as network_metrics + + +def test_provider_log_request_response_redacts_secrets_and_prompt_content( + tmp_path, monkeypatch +): + monkeypatch.setattr(provider_logging, "get_logs_dir", lambda: tmp_path) + + provider_logging.log_request_response( + function_name="simple_call", + provider="openai", + request={ + "model": "gpt-5-mini", + "api_key": "sk-test-secret", + "messages": [{"role": "user", "content": "Sensitive persona prompt"}], + "Authorization": "Bearer abc123", + "metadata": {"public_note": "ok"}, + }, + response={ + "output_text": "Potentially sensitive response content", + "usage": {"prompt_tokens": 21, "completion_tokens": 9}, + }, + sources=["https://example.com/source"], + ) + + log_files = list(tmp_path.glob("*_openai_simple_call.json")) + assert len(log_files) == 1 + + payload = json.loads(log_files[0].read_text()) + assert payload["request"]["api_key"] == "[REDACTED_SECRET]" + assert payload["request"]["Authorization"] == "[REDACTED_SECRET]" + assert ( + payload["request"]["messages"][0]["content"] + == "[REDACTED_TEXT length=24]" + ) + assert payload["response"]["output_text"] == "[REDACTED_TEXT length=38]" + assert payload["response"]["usage"]["prompt_tokens"] == 21 + assert payload["sources_extracted"] == ["https://example.com/source"] + + +def test_validate_network_verbose_logs_instead_of_print(capsys, caplog): + if not network_metrics.HAS_NETWORKX: + pytest.skip("networkx not installed") + + edges = [ + {"source": "a0", "target": "a1", "weight": 0.8}, + {"source": "a1", "target": "a2", "weight": 0.7}, + ] + agent_ids = ["a0", "a1", "a2"] + + with caplog.at_level("INFO"): + is_valid, metrics, warnings = network_metrics.validate_network( + edges, agent_ids, verbose=True + ) + + captured = capsys.readouterr() + assert captured.out == "" + assert captured.err == "" + assert is_valid in (True, False) + assert metrics.node_count == 3 + assert isinstance(warnings, list) + assert "Network Validation Report:" in caplog.text