From 4685d7a080599efb602191aa072fd8d7a3dc0576 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 28 Dec 2025 18:09:41 +0000 Subject: [PATCH 1/3] Initial plan From 347efb995d45e62d460fc32aabee0dceba440ca8 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 28 Dec 2025 18:16:26 +0000 Subject: [PATCH 2/3] Implement DL-5: ProposedChange CRUD operations with MongoDB Co-authored-by: spuentesp <112034353+spuentesp@users.noreply.github.com> --- .../src/monitor_data/db/__init__.py | 5 +- .../data-layer/src/monitor_data/db/mongodb.py | 113 +++ .../src/monitor_data/middleware/auth.py | 7 +- .../monitor_data/schemas/proposed_changes.py | 165 +++++ .../src/monitor_data/tools/mongodb_tools.py | 298 ++++++++ .../test_tools/test_proposed_change_tools.py | 676 ++++++++++++++++++ 6 files changed, 1261 insertions(+), 3 deletions(-) create mode 100644 packages/data-layer/src/monitor_data/db/mongodb.py create mode 100644 packages/data-layer/src/monitor_data/schemas/proposed_changes.py create mode 100644 packages/data-layer/src/monitor_data/tools/mongodb_tools.py create mode 100644 packages/data-layer/tests/test_tools/test_proposed_change_tools.py diff --git a/packages/data-layer/src/monitor_data/db/__init__.py b/packages/data-layer/src/monitor_data/db/__init__.py index ab0c83e..aab5c84 100644 --- a/packages/data-layer/src/monitor_data/db/__init__.py +++ b/packages/data-layer/src/monitor_data/db/__init__.py @@ -15,15 +15,16 @@ """ from monitor_data.db.neo4j import Neo4jClient, get_neo4j_client +from monitor_data.db.mongodb import MongoDBClient, get_mongodb_client -# from monitor_data.db.mongodb import MongoDBClient # from monitor_data.db.qdrant import QdrantClient # from monitor_data.db.minio import MinIOClient __all__ = [ "Neo4jClient", "get_neo4j_client", - # "MongoDBClient", + "MongoDBClient", + "get_mongodb_client", # "QdrantClient", # "MinIOClient", ] diff --git a/packages/data-layer/src/monitor_data/db/mongodb.py b/packages/data-layer/src/monitor_data/db/mongodb.py new file mode 100644 index 0000000..c8586f9 --- /dev/null +++ b/packages/data-layer/src/monitor_data/db/mongodb.py @@ -0,0 +1,113 @@ +""" +MongoDB client for MONITOR Data Layer. + +LAYER: 1 (data-layer) +IMPORTS FROM: External libraries only +CALLED BY: mongodb_tools.py + +This provides a thin wrapper around pymongo for narrative document storage. +Collections: scenes, turns, proposed_changes, resolutions, memories, etc. +""" + +import os +from typing import Optional +from pymongo import MongoClient +from pymongo.database import Database + + +class MongoDBClient: + """ + MongoDB client for MONITOR narrative documents. + + Provides access to MongoDB collections with connection pooling. + Thread-safe client suitable for use across requests. + """ + + def __init__( + self, + uri: Optional[str] = None, + database: str = "monitor", + ): + """ + Initialize MongoDB client. + + Args: + uri: MongoDB connection URI (defaults to MONGODB_URI env var) + database: Database name (defaults to "monitor") + """ + self.uri = uri or os.getenv("MONGODB_URI", "mongodb://localhost:27017") + self.database_name = database + self._client: Optional[MongoClient] = None + self._db: Optional[Database] = None + + def connect(self) -> None: + """Establish connection to MongoDB.""" + if self._client is None: + self._client = MongoClient(self.uri) + self._db = self._client[self.database_name] + + def close(self) -> None: + """Close MongoDB connection.""" + if self._client: + self._client.close() + self._client = None + self._db = None + + @property + def db(self) -> Database: + """ + Get the MongoDB database instance. + + Returns: + MongoDB Database object + + Raises: + RuntimeError: If not connected + """ + if self._db is None: + raise RuntimeError("MongoDB client not connected. Call connect() first.") + return self._db + + def verify_connectivity(self) -> bool: + """ + Verify MongoDB connection is working. + + Returns: + True if connection is healthy + """ + try: + if self._client: + self._client.admin.command("ping") + return True + return False + except Exception: + return False + + +# ============================================================================= +# SINGLETON CLIENT +# ============================================================================= + +_mongodb_client: Optional[MongoDBClient] = None + + +def get_mongodb_client() -> MongoDBClient: + """ + Get or create the singleton MongoDB client. + + Returns: + MongoDBClient instance (connected) + """ + global _mongodb_client + if _mongodb_client is None: + _mongodb_client = MongoDBClient() + _mongodb_client.connect() + return _mongodb_client + + +def close_mongodb_client() -> None: + """Close the singleton MongoDB client.""" + global _mongodb_client + if _mongodb_client: + _mongodb_client.close() + _mongodb_client = None diff --git a/packages/data-layer/src/monitor_data/middleware/auth.py b/packages/data-layer/src/monitor_data/middleware/auth.py index 5dcc057..b66bca8 100644 --- a/packages/data-layer/src/monitor_data/middleware/auth.py +++ b/packages/data-layer/src/monitor_data/middleware/auth.py @@ -100,8 +100,13 @@ "mongodb_get_turns": ["*"], "mongodb_undo_turn": ["Orchestrator"], # ========================================================================= - # MONGODB OPERATIONS - Proposals + # MONGODB OPERATIONS - Proposals (DL-5) # ========================================================================= + "mongodb_create_proposed_change": ["*"], + "mongodb_get_proposed_change": ["*"], + "mongodb_list_proposed_changes": ["*"], + "mongodb_update_proposed_change": ["CanonKeeper"], + # Legacy proposal operations (deprecated, use proposed_change instead) "mongodb_create_proposal": ["Narrator", "Resolver", "CanonKeeper"], "mongodb_get_proposals": ["*"], "mongodb_update_proposal": ["CanonKeeper"], diff --git a/packages/data-layer/src/monitor_data/schemas/proposed_changes.py b/packages/data-layer/src/monitor_data/schemas/proposed_changes.py new file mode 100644 index 0000000..7fd50ca --- /dev/null +++ b/packages/data-layer/src/monitor_data/schemas/proposed_changes.py @@ -0,0 +1,165 @@ +""" +Pydantic schemas for ProposedChange operations in MongoDB. + +LAYER: 1 (data-layer) +IMPORTS FROM: External libraries (pydantic, uuid, datetime) and base schemas +CALLED BY: mongodb_tools.py + +These schemas define the data contracts for ProposedChange CRUD operations. +ProposedChanges are staging documents that CanonKeeper evaluates at scene end. +""" + +from datetime import datetime +from typing import Optional, List, Dict, Any +from uuid import UUID + +from pydantic import BaseModel, Field + +from monitor_data.schemas.base import Authority, ProposalStatus, ProposalType + + +# ============================================================================= +# PROPOSED CHANGE SCHEMAS +# ============================================================================= + + +class EvidenceRef(BaseModel): + """Reference to supporting evidence for a proposed change.""" + + type: str = Field( + description="Type of evidence: turn, snippet, source, fact, rule" + ) + ref_id: UUID = Field(description="UUID of the referenced document/node") + + +class ProposedChangeCreate(BaseModel): + """Request to create a ProposedChange document.""" + + change_type: ProposalType = Field( + description="Type of change being proposed" + ) + content: Dict[str, Any] = Field( + description="Flexible JSON payload specific to change_type" + ) + scene_id: Optional[UUID] = Field( + None, + description="Scene this change was proposed in (optional for system/ingest)", + ) + story_id: Optional[UUID] = Field( + None, + description="Story this change relates to", + ) + universe_id: UUID = Field( + description="Universe this change affects" + ) + turn_id: Optional[UUID] = Field( + None, + description="Turn that proposed this change (if applicable)", + ) + confidence: float = Field( + ge=0.0, + le=1.0, + default=1.0, + description="Confidence level in this proposal", + ) + authority: Authority = Field( + default=Authority.SYSTEM, + description="Who is proposing this change", + ) + evidence_refs: List[EvidenceRef] = Field( + default_factory=list, + description="Supporting evidence for this proposal", + ) + proposed_by: str = Field( + description="Agent or system component that proposed this change" + ) + + +class ProposedChangeUpdate(BaseModel): + """Request to update a ProposedChange (status transitions).""" + + status: ProposalStatus = Field( + description="New status: accepted or rejected" + ) + decision_reason: Optional[str] = Field( + None, + max_length=2000, + description="Rationale for accepting/rejecting", + ) + canonical_ref: Optional[UUID] = Field( + None, + description="If accepted, the Neo4j node/edge ID that was created", + ) + decided_by: str = Field( + default="CanonKeeper", + description="Agent that made the decision (usually CanonKeeper)", + ) + + +class ProposedChangeResponse(BaseModel): + """Response with ProposedChange document data.""" + + proposal_id: UUID + change_type: ProposalType + content: Dict[str, Any] + scene_id: Optional[UUID] + story_id: Optional[UUID] + universe_id: UUID + turn_id: Optional[UUID] + confidence: float + authority: Authority + evidence_refs: List[EvidenceRef] + proposed_by: str + status: ProposalStatus + decision_reason: Optional[str] = None + canonical_ref: Optional[UUID] = None + decided_by: Optional[str] = None + created_at: datetime + decided_at: Optional[datetime] = None + + model_config = {"from_attributes": True} + + +class ProposedChangeFilter(BaseModel): + """Filter parameters for listing proposed changes.""" + + scene_id: Optional[UUID] = Field( + None, + description="Filter by scene ID", + ) + story_id: Optional[UUID] = Field( + None, + description="Filter by story ID", + ) + universe_id: Optional[UUID] = Field( + None, + description="Filter by universe ID", + ) + status: Optional[ProposalStatus] = Field( + None, + description="Filter by status", + ) + change_type: Optional[ProposalType] = Field( + None, + description="Filter by change type", + ) + limit: int = Field( + default=50, + ge=1, + le=1000, + description="Maximum number of results", + ) + offset: int = Field( + default=0, + ge=0, + description="Number of results to skip", + ) + + +class ProposedChangeListResponse(BaseModel): + """Response with list of proposed changes and pagination info.""" + + proposals: List[ProposedChangeResponse] + total: int + limit: int + offset: int diff --git a/packages/data-layer/src/monitor_data/tools/mongodb_tools.py b/packages/data-layer/src/monitor_data/tools/mongodb_tools.py new file mode 100644 index 0000000..03410b5 --- /dev/null +++ b/packages/data-layer/src/monitor_data/tools/mongodb_tools.py @@ -0,0 +1,298 @@ +""" +MongoDB MCP Tools for MONITOR Data Layer. + +LAYER: 1 (data-layer) +IMPORTS FROM: External libraries and data-layer modules only +CALLED BY: Agents (Layer 2) via MCP protocol + +These tools expose MongoDB operations via the MCP server. +ProposedChange operations: any agent can create, only CanonKeeper can update status. +""" + +from datetime import datetime, timezone +from typing import Dict, List, Optional, Any +from uuid import UUID, uuid4 + +from monitor_data.db.mongodb import get_mongodb_client +from monitor_data.schemas.proposed_changes import ( + ProposedChangeCreate, + ProposedChangeUpdate, + ProposedChangeResponse, + ProposedChangeFilter, + ProposedChangeListResponse, + EvidenceRef, +) +from monitor_data.schemas.base import ProposalStatus + + +# ============================================================================= +# PROPOSED CHANGE OPERATIONS +# ============================================================================= + + +def mongodb_create_proposed_change(params: ProposedChangeCreate) -> ProposedChangeResponse: + """ + Create a new ProposedChange document in MongoDB. + + Authority: Any agent (*) + Use Case: DL-5 + + Args: + params: ProposedChange creation parameters + + Returns: + ProposedChangeResponse with created document data + + Raises: + ValueError: If validation fails + """ + client = get_mongodb_client() + collection = client.db["proposed_changes"] + + proposal_id = uuid4() + created_at = datetime.now(timezone.utc) + + # Build document + document = { + "proposal_id": str(proposal_id), + "change_type": params.change_type.value, + "content": params.content, + "scene_id": str(params.scene_id) if params.scene_id else None, + "story_id": str(params.story_id) if params.story_id else None, + "universe_id": str(params.universe_id), + "turn_id": str(params.turn_id) if params.turn_id else None, + "confidence": params.confidence, + "authority": params.authority.value, + "evidence_refs": [ + {"type": ref.type, "ref_id": str(ref.ref_id)} + for ref in params.evidence_refs + ], + "proposed_by": params.proposed_by, + "status": ProposalStatus.PENDING.value, + "decision_reason": None, + "canonical_ref": None, + "decided_by": None, + "created_at": created_at, + "decided_at": None, + } + + # Insert document + collection.insert_one(document) + + # Return response + return ProposedChangeResponse( + proposal_id=proposal_id, + change_type=params.change_type, + content=params.content, + scene_id=params.scene_id, + story_id=params.story_id, + universe_id=params.universe_id, + turn_id=params.turn_id, + confidence=params.confidence, + authority=params.authority, + evidence_refs=params.evidence_refs, + proposed_by=params.proposed_by, + status=ProposalStatus.PENDING, + decision_reason=None, + canonical_ref=None, + decided_by=None, + created_at=created_at, + decided_at=None, + ) + + +def mongodb_get_proposed_change(proposal_id: UUID) -> Optional[ProposedChangeResponse]: + """ + Get a ProposedChange document by ID. + + Authority: Any agent (*) + Use Case: DL-5 + + Args: + proposal_id: UUID of the proposed change + + Returns: + ProposedChangeResponse if found, None otherwise + """ + client = get_mongodb_client() + collection = client.db["proposed_changes"] + + document = collection.find_one({"proposal_id": str(proposal_id)}) + + if not document: + return None + + return _document_to_response(document) + + +def mongodb_list_proposed_changes( + filters: Optional[ProposedChangeFilter] = None, +) -> ProposedChangeListResponse: + """ + List proposed changes with optional filtering and pagination. + + Authority: Any agent (*) + Use Case: DL-5 + + Args: + filters: Optional filter parameters (scene_id, status, change_type, limit, offset) + + Returns: + ProposedChangeListResponse with proposals and pagination info + """ + client = get_mongodb_client() + collection = client.db["proposed_changes"] + + if filters is None: + filters = ProposedChangeFilter() + + # Build query filter + query: Dict[str, Any] = {} + + if filters.scene_id: + query["scene_id"] = str(filters.scene_id) + + if filters.story_id: + query["story_id"] = str(filters.story_id) + + if filters.universe_id: + query["universe_id"] = str(filters.universe_id) + + if filters.status: + query["status"] = filters.status.value + + if filters.change_type: + query["change_type"] = filters.change_type.value + + # Count total + total = collection.count_documents(query) + + # Get documents with pagination + cursor = ( + collection.find(query) + .sort("created_at", -1) + .skip(filters.offset) + .limit(filters.limit) + ) + + proposals = [_document_to_response(doc) for doc in cursor] + + return ProposedChangeListResponse( + proposals=proposals, + total=total, + limit=filters.limit, + offset=filters.offset, + ) + + +def mongodb_update_proposed_change( + proposal_id: UUID, params: ProposedChangeUpdate +) -> ProposedChangeResponse: + """ + Update a ProposedChange document (status transition). + + Authority: CanonKeeper only + Use Case: DL-5 + + Args: + proposal_id: UUID of the proposed change + params: Update parameters (status, decision_reason, canonical_ref) + + Returns: + ProposedChangeResponse with updated document data + + Raises: + ValueError: If proposal doesn't exist or invalid status transition + """ + client = get_mongodb_client() + collection = client.db["proposed_changes"] + + # Get existing document + document = collection.find_one({"proposal_id": str(proposal_id)}) + if not document: + raise ValueError(f"ProposedChange {proposal_id} not found") + + # Validate status transition + current_status = ProposalStatus(document["status"]) + if current_status != ProposalStatus.PENDING: + raise ValueError( + f"Cannot update ProposedChange with status '{current_status.value}'. " + "Only 'pending' proposals can be updated to 'accepted' or 'rejected'." + ) + + if params.status == ProposalStatus.PENDING: + raise ValueError( + "Cannot transition to 'pending' status. " + "Valid transitions are: pending → accepted, pending → rejected." + ) + + # Validate canonical_ref for accepted proposals + if params.status == ProposalStatus.ACCEPTED and not params.canonical_ref: + raise ValueError( + "canonical_ref is required when accepting a proposal. " + "It must reference the created Neo4j node/edge ID." + ) + + # Build update + decided_at = datetime.now(timezone.utc) + update_doc = { + "$set": { + "status": params.status.value, + "decision_reason": params.decision_reason, + "canonical_ref": str(params.canonical_ref) if params.canonical_ref else None, + "decided_by": params.decided_by, + "decided_at": decided_at, + } + } + + # Update document + collection.update_one( + {"proposal_id": str(proposal_id)}, + update_doc, + ) + + # Get updated document + updated_doc = collection.find_one({"proposal_id": str(proposal_id)}) + if not updated_doc: + raise ValueError(f"ProposedChange {proposal_id} not found after update") + + return _document_to_response(updated_doc) + + +# ============================================================================= +# HELPER FUNCTIONS +# ============================================================================= + + +def _document_to_response(document: Dict[str, Any]) -> ProposedChangeResponse: + """ + Convert a MongoDB document to ProposedChangeResponse. + + Args: + document: Raw MongoDB document + + Returns: + ProposedChangeResponse object + """ + return ProposedChangeResponse( + proposal_id=UUID(document["proposal_id"]), + change_type=document["change_type"], + content=document["content"], + scene_id=UUID(document["scene_id"]) if document.get("scene_id") else None, + story_id=UUID(document["story_id"]) if document.get("story_id") else None, + universe_id=UUID(document["universe_id"]), + turn_id=UUID(document["turn_id"]) if document.get("turn_id") else None, + confidence=document["confidence"], + authority=document["authority"], + evidence_refs=[ + EvidenceRef(type=ref["type"], ref_id=UUID(ref["ref_id"])) + for ref in document.get("evidence_refs", []) + ], + proposed_by=document["proposed_by"], + status=document["status"], + decision_reason=document.get("decision_reason"), + canonical_ref=UUID(document["canonical_ref"]) if document.get("canonical_ref") else None, + decided_by=document.get("decided_by"), + created_at=document["created_at"], + decided_at=document.get("decided_at"), + ) diff --git a/packages/data-layer/tests/test_tools/test_proposed_change_tools.py b/packages/data-layer/tests/test_tools/test_proposed_change_tools.py new file mode 100644 index 0000000..00960f0 --- /dev/null +++ b/packages/data-layer/tests/test_tools/test_proposed_change_tools.py @@ -0,0 +1,676 @@ +""" +Unit tests for MongoDB ProposedChange operations. + +Tests cover: +- mongodb_create_proposed_change +- mongodb_get_proposed_change +- mongodb_list_proposed_changes +- mongodb_update_proposed_change +""" + +from typing import Dict, Any +from unittest.mock import Mock, patch, MagicMock +from uuid import UUID, uuid4 +from datetime import datetime, timezone + +import pytest + +from monitor_data.schemas.proposed_changes import ( + ProposedChangeCreate, + ProposedChangeUpdate, + ProposedChangeFilter, + EvidenceRef, +) +from monitor_data.schemas.base import Authority, ProposalStatus, ProposalType +from monitor_data.tools.mongodb_tools import ( + mongodb_create_proposed_change, + mongodb_get_proposed_change, + mongodb_list_proposed_changes, + mongodb_update_proposed_change, +) + + +# ============================================================================= +# TESTS: mongodb_create_proposed_change +# ============================================================================= + + +@patch("monitor_data.tools.mongodb_tools.get_mongodb_client") +def test_create_proposed_change_fact(mock_get_client: Mock): + """Test creating a proposed change for a fact.""" + # Setup mock + mock_client = MagicMock() + mock_collection = MagicMock() + mock_client.db = {"proposed_changes": mock_collection} + mock_get_client.return_value = mock_client + + universe_id = uuid4() + scene_id = uuid4() + story_id = uuid4() + + params = ProposedChangeCreate( + change_type=ProposalType.FACT, + content={ + "statement": "The wizard cast a fireball", + "entity_ids": [str(uuid4())], + }, + scene_id=scene_id, + story_id=story_id, + universe_id=universe_id, + confidence=0.9, + authority=Authority.PLAYER, + proposed_by="Narrator", + ) + + result = mongodb_create_proposed_change(params) + + # Assertions + assert result.change_type == ProposalType.FACT + assert result.content == params.content + assert result.scene_id == scene_id + assert result.story_id == story_id + assert result.universe_id == universe_id + assert result.confidence == 0.9 + assert result.authority == Authority.PLAYER + assert result.proposed_by == "Narrator" + assert result.status == ProposalStatus.PENDING + assert result.decision_reason is None + assert result.canonical_ref is None + assert mock_collection.insert_one.call_count == 1 + + +@patch("monitor_data.tools.mongodb_tools.get_mongodb_client") +def test_create_proposed_change_entity(mock_get_client: Mock): + """Test creating a proposed change for an entity.""" + # Setup mock + mock_client = MagicMock() + mock_collection = MagicMock() + mock_client.db = {"proposed_changes": mock_collection} + mock_get_client.return_value = mock_client + + universe_id = uuid4() + + params = ProposedChangeCreate( + change_type=ProposalType.ENTITY, + content={ + "name": "Gandalf", + "entity_type": "character", + "properties": {"class": "wizard", "level": 20}, + }, + universe_id=universe_id, + proposed_by="CanonKeeper", + ) + + result = mongodb_create_proposed_change(params) + + # Assertions + assert result.change_type == ProposalType.ENTITY + assert result.content["name"] == "Gandalf" + assert result.universe_id == universe_id + assert result.status == ProposalStatus.PENDING + assert mock_collection.insert_one.call_count == 1 + + +@patch("monitor_data.tools.mongodb_tools.get_mongodb_client") +def test_create_proposed_change_relationship(mock_get_client: Mock): + """Test creating a proposed change for a relationship.""" + # Setup mock + mock_client = MagicMock() + mock_collection = MagicMock() + mock_client.db = {"proposed_changes": mock_collection} + mock_get_client.return_value = mock_client + + universe_id = uuid4() + from_entity = uuid4() + to_entity = uuid4() + + params = ProposedChangeCreate( + change_type=ProposalType.RELATIONSHIP, + content={ + "from": str(from_entity), + "to": str(to_entity), + "rel_type": "ALLY_OF", + "properties": {"since": "2024-01-01"}, + }, + universe_id=universe_id, + proposed_by="Narrator", + ) + + result = mongodb_create_proposed_change(params) + + # Assertions + assert result.change_type == ProposalType.RELATIONSHIP + assert result.content["rel_type"] == "ALLY_OF" + assert result.status == ProposalStatus.PENDING + assert mock_collection.insert_one.call_count == 1 + + +@patch("monitor_data.tools.mongodb_tools.get_mongodb_client") +def test_create_proposed_change_with_evidence(mock_get_client: Mock): + """Test creating a proposed change with evidence references.""" + # Setup mock + mock_client = MagicMock() + mock_collection = MagicMock() + mock_client.db = {"proposed_changes": mock_collection} + mock_get_client.return_value = mock_client + + universe_id = uuid4() + turn_id = uuid4() + source_id = uuid4() + + params = ProposedChangeCreate( + change_type=ProposalType.FACT, + content={"statement": "Test fact"}, + universe_id=universe_id, + evidence_refs=[ + EvidenceRef(type="turn", ref_id=turn_id), + EvidenceRef(type="source", ref_id=source_id), + ], + proposed_by="Narrator", + ) + + result = mongodb_create_proposed_change(params) + + # Assertions + assert len(result.evidence_refs) == 2 + assert result.evidence_refs[0].type == "turn" + assert result.evidence_refs[0].ref_id == turn_id + assert result.evidence_refs[1].type == "source" + assert result.evidence_refs[1].ref_id == source_id + + +# ============================================================================= +# TESTS: mongodb_get_proposed_change +# ============================================================================= + + +@patch("monitor_data.tools.mongodb_tools.get_mongodb_client") +def test_get_proposed_change_success(mock_get_client: Mock): + """Test getting a proposed change by ID.""" + # Setup mock + mock_client = MagicMock() + mock_collection = MagicMock() + mock_client.db = {"proposed_changes": mock_collection} + mock_get_client.return_value = mock_client + + proposal_id = uuid4() + universe_id = uuid4() + created_at = datetime.now(timezone.utc) + + mock_document = { + "proposal_id": str(proposal_id), + "change_type": ProposalType.FACT.value, + "content": {"statement": "Test fact"}, + "scene_id": None, + "story_id": None, + "universe_id": str(universe_id), + "turn_id": None, + "confidence": 1.0, + "authority": Authority.SYSTEM.value, + "evidence_refs": [], + "proposed_by": "System", + "status": ProposalStatus.PENDING.value, + "decision_reason": None, + "canonical_ref": None, + "decided_by": None, + "created_at": created_at, + "decided_at": None, + } + mock_collection.find_one.return_value = mock_document + + result = mongodb_get_proposed_change(proposal_id) + + # Assertions + assert result is not None + assert result.proposal_id == proposal_id + assert result.change_type == ProposalType.FACT + assert result.status == ProposalStatus.PENDING + assert mock_collection.find_one.call_count == 1 + + +@patch("monitor_data.tools.mongodb_tools.get_mongodb_client") +def test_get_proposed_change_not_found(mock_get_client: Mock): + """Test getting a non-existent proposed change.""" + # Setup mock + mock_client = MagicMock() + mock_collection = MagicMock() + mock_client.db = {"proposed_changes": mock_collection} + mock_get_client.return_value = mock_client + + mock_collection.find_one.return_value = None + + result = mongodb_get_proposed_change(uuid4()) + + # Assertions + assert result is None + + +# ============================================================================= +# TESTS: mongodb_list_proposed_changes +# ============================================================================= + + +@patch("monitor_data.tools.mongodb_tools.get_mongodb_client") +def test_list_by_scene(mock_get_client: Mock): + """Test listing proposed changes filtered by scene_id.""" + # Setup mock + mock_client = MagicMock() + mock_collection = MagicMock() + mock_client.db = {"proposed_changes": mock_collection} + mock_get_client.return_value = mock_client + + scene_id = uuid4() + universe_id = uuid4() + created_at = datetime.now(timezone.utc) + + mock_documents = [ + { + "proposal_id": str(uuid4()), + "change_type": ProposalType.FACT.value, + "content": {"statement": f"Fact {i}"}, + "scene_id": str(scene_id), + "story_id": None, + "universe_id": str(universe_id), + "turn_id": None, + "confidence": 1.0, + "authority": Authority.SYSTEM.value, + "evidence_refs": [], + "proposed_by": "Narrator", + "status": ProposalStatus.PENDING.value, + "decision_reason": None, + "canonical_ref": None, + "decided_by": None, + "created_at": created_at, + "decided_at": None, + } + for i in range(3) + ] + + mock_collection.count_documents.return_value = 3 + mock_cursor = MagicMock() + mock_cursor.sort.return_value = mock_cursor + mock_cursor.skip.return_value = mock_cursor + mock_cursor.limit.return_value = mock_cursor + mock_cursor.__iter__.return_value = iter(mock_documents) + mock_collection.find.return_value = mock_cursor + + filters = ProposedChangeFilter(scene_id=scene_id) + result = mongodb_list_proposed_changes(filters) + + # Assertions + assert len(result.proposals) == 3 + assert result.total == 3 + assert all(p.scene_id == scene_id for p in result.proposals) + mock_collection.find.assert_called_once_with({"scene_id": str(scene_id)}) + + +@patch("monitor_data.tools.mongodb_tools.get_mongodb_client") +def test_list_by_status(mock_get_client: Mock): + """Test listing proposed changes filtered by status.""" + # Setup mock + mock_client = MagicMock() + mock_collection = MagicMock() + mock_client.db = {"proposed_changes": mock_collection} + mock_get_client.return_value = mock_client + + universe_id = uuid4() + created_at = datetime.now(timezone.utc) + + mock_documents = [ + { + "proposal_id": str(uuid4()), + "change_type": ProposalType.FACT.value, + "content": {"statement": "Pending fact"}, + "scene_id": None, + "story_id": None, + "universe_id": str(universe_id), + "turn_id": None, + "confidence": 1.0, + "authority": Authority.SYSTEM.value, + "evidence_refs": [], + "proposed_by": "Narrator", + "status": ProposalStatus.PENDING.value, + "decision_reason": None, + "canonical_ref": None, + "decided_by": None, + "created_at": created_at, + "decided_at": None, + } + ] + + mock_collection.count_documents.return_value = 1 + mock_cursor = MagicMock() + mock_cursor.sort.return_value = mock_cursor + mock_cursor.skip.return_value = mock_cursor + mock_cursor.limit.return_value = mock_cursor + mock_cursor.__iter__.return_value = iter(mock_documents) + mock_collection.find.return_value = mock_cursor + + filters = ProposedChangeFilter(status=ProposalStatus.PENDING) + result = mongodb_list_proposed_changes(filters) + + # Assertions + assert len(result.proposals) == 1 + assert result.proposals[0].status == ProposalStatus.PENDING + mock_collection.find.assert_called_once_with({"status": ProposalStatus.PENDING.value}) + + +@patch("monitor_data.tools.mongodb_tools.get_mongodb_client") +def test_list_by_change_type(mock_get_client: Mock): + """Test listing proposed changes filtered by change_type.""" + # Setup mock + mock_client = MagicMock() + mock_collection = MagicMock() + mock_client.db = {"proposed_changes": mock_collection} + mock_get_client.return_value = mock_client + + mock_collection.count_documents.return_value = 0 + mock_cursor = MagicMock() + mock_cursor.sort.return_value = mock_cursor + mock_cursor.skip.return_value = mock_cursor + mock_cursor.limit.return_value = mock_cursor + mock_cursor.__iter__.return_value = iter([]) + mock_collection.find.return_value = mock_cursor + + filters = ProposedChangeFilter(change_type=ProposalType.ENTITY) + result = mongodb_list_proposed_changes(filters) + + # Assertions + assert len(result.proposals) == 0 + mock_collection.find.assert_called_once_with({"change_type": ProposalType.ENTITY.value}) + + +# ============================================================================= +# TESTS: mongodb_update_proposed_change +# ============================================================================= + + +@patch("monitor_data.tools.mongodb_tools.get_mongodb_client") +def test_update_accept(mock_get_client: Mock): + """Test accepting a pending proposal.""" + # Setup mock + mock_client = MagicMock() + mock_collection = MagicMock() + mock_client.db = {"proposed_changes": mock_collection} + mock_get_client.return_value = mock_client + + proposal_id = uuid4() + universe_id = uuid4() + canonical_ref = uuid4() + created_at = datetime.now(timezone.utc) + + # Mock existing document (pending) + existing_doc = { + "proposal_id": str(proposal_id), + "change_type": ProposalType.FACT.value, + "content": {"statement": "Test fact"}, + "scene_id": None, + "story_id": None, + "universe_id": str(universe_id), + "turn_id": None, + "confidence": 1.0, + "authority": Authority.SYSTEM.value, + "evidence_refs": [], + "proposed_by": "Narrator", + "status": ProposalStatus.PENDING.value, + "decision_reason": None, + "canonical_ref": None, + "decided_by": None, + "created_at": created_at, + "decided_at": None, + } + + # Mock updated document (accepted) + decided_at = datetime.now(timezone.utc) + updated_doc = { + **existing_doc, + "status": ProposalStatus.ACCEPTED.value, + "decision_reason": "Valid fact", + "canonical_ref": str(canonical_ref), + "decided_by": "CanonKeeper", + "decided_at": decided_at, + } + + # Setup mock to return existing doc first, then updated doc + mock_collection.find_one.side_effect = [existing_doc, updated_doc] + + params = ProposedChangeUpdate( + status=ProposalStatus.ACCEPTED, + decision_reason="Valid fact", + canonical_ref=canonical_ref, + decided_by="CanonKeeper", + ) + + result = mongodb_update_proposed_change(proposal_id, params) + + # Assertions + assert result.status == ProposalStatus.ACCEPTED + assert result.decision_reason == "Valid fact" + assert result.canonical_ref == canonical_ref + assert result.decided_by == "CanonKeeper" + assert mock_collection.update_one.call_count == 1 + + +@patch("monitor_data.tools.mongodb_tools.get_mongodb_client") +def test_update_reject(mock_get_client: Mock): + """Test rejecting a pending proposal.""" + # Setup mock + mock_client = MagicMock() + mock_collection = MagicMock() + mock_client.db = {"proposed_changes": mock_collection} + mock_get_client.return_value = mock_client + + proposal_id = uuid4() + universe_id = uuid4() + created_at = datetime.now(timezone.utc) + + # Mock existing document (pending) + existing_doc = { + "proposal_id": str(proposal_id), + "change_type": ProposalType.FACT.value, + "content": {"statement": "Test fact"}, + "scene_id": None, + "story_id": None, + "universe_id": str(universe_id), + "turn_id": None, + "confidence": 1.0, + "authority": Authority.SYSTEM.value, + "evidence_refs": [], + "proposed_by": "Narrator", + "status": ProposalStatus.PENDING.value, + "decision_reason": None, + "canonical_ref": None, + "decided_by": None, + "created_at": created_at, + "decided_at": None, + } + + # Mock updated document (rejected) + decided_at = datetime.now(timezone.utc) + updated_doc = { + **existing_doc, + "status": ProposalStatus.REJECTED.value, + "decision_reason": "Contradicts existing canon", + "decided_by": "CanonKeeper", + "decided_at": decided_at, + } + + mock_collection.find_one.side_effect = [existing_doc, updated_doc] + + params = ProposedChangeUpdate( + status=ProposalStatus.REJECTED, + decision_reason="Contradicts existing canon", + decided_by="CanonKeeper", + ) + + result = mongodb_update_proposed_change(proposal_id, params) + + # Assertions + assert result.status == ProposalStatus.REJECTED + assert result.decision_reason == "Contradicts existing canon" + assert result.canonical_ref is None + assert mock_collection.update_one.call_count == 1 + + +@patch("monitor_data.tools.mongodb_tools.get_mongodb_client") +def test_update_invalid_transition(mock_get_client: Mock): + """Test invalid status transition (accepted → pending).""" + # Setup mock + mock_client = MagicMock() + mock_collection = MagicMock() + mock_client.db = {"proposed_changes": mock_collection} + mock_get_client.return_value = mock_client + + proposal_id = uuid4() + universe_id = uuid4() + created_at = datetime.now(timezone.utc) + + # Mock existing document (already accepted) + existing_doc = { + "proposal_id": str(proposal_id), + "change_type": ProposalType.FACT.value, + "content": {"statement": "Test fact"}, + "scene_id": None, + "story_id": None, + "universe_id": str(universe_id), + "turn_id": None, + "confidence": 1.0, + "authority": Authority.SYSTEM.value, + "evidence_refs": [], + "proposed_by": "Narrator", + "status": ProposalStatus.ACCEPTED.value, + "decision_reason": "Already accepted", + "canonical_ref": str(uuid4()), + "decided_by": "CanonKeeper", + "created_at": created_at, + "decided_at": created_at, + } + + mock_collection.find_one.return_value = existing_doc + + params = ProposedChangeUpdate( + status=ProposalStatus.REJECTED, + decision_reason="Trying to change", + decided_by="CanonKeeper", + ) + + # Should raise ValueError + with pytest.raises(ValueError, match="Cannot update ProposedChange with status"): + mongodb_update_proposed_change(proposal_id, params) + + +@patch("monitor_data.tools.mongodb_tools.get_mongodb_client") +def test_update_transition_to_pending_fails(mock_get_client: Mock): + """Test that transitioning to pending status fails.""" + # Setup mock + mock_client = MagicMock() + mock_collection = MagicMock() + mock_client.db = {"proposed_changes": mock_collection} + mock_get_client.return_value = mock_client + + proposal_id = uuid4() + universe_id = uuid4() + created_at = datetime.now(timezone.utc) + + # Mock existing document (pending) + existing_doc = { + "proposal_id": str(proposal_id), + "change_type": ProposalType.FACT.value, + "content": {"statement": "Test fact"}, + "scene_id": None, + "story_id": None, + "universe_id": str(universe_id), + "turn_id": None, + "confidence": 1.0, + "authority": Authority.SYSTEM.value, + "evidence_refs": [], + "proposed_by": "Narrator", + "status": ProposalStatus.PENDING.value, + "decision_reason": None, + "canonical_ref": None, + "decided_by": None, + "created_at": created_at, + "decided_at": None, + } + + mock_collection.find_one.return_value = existing_doc + + params = ProposedChangeUpdate( + status=ProposalStatus.PENDING, + decision_reason="Trying to set to pending", + decided_by="CanonKeeper", + ) + + # Should raise ValueError + with pytest.raises(ValueError, match="Cannot transition to 'pending'"): + mongodb_update_proposed_change(proposal_id, params) + + +@patch("monitor_data.tools.mongodb_tools.get_mongodb_client") +def test_update_accept_without_canonical_ref_fails(mock_get_client: Mock): + """Test that accepting without canonical_ref fails.""" + # Setup mock + mock_client = MagicMock() + mock_collection = MagicMock() + mock_client.db = {"proposed_changes": mock_collection} + mock_get_client.return_value = mock_client + + proposal_id = uuid4() + universe_id = uuid4() + created_at = datetime.now(timezone.utc) + + # Mock existing document (pending) + existing_doc = { + "proposal_id": str(proposal_id), + "change_type": ProposalType.FACT.value, + "content": {"statement": "Test fact"}, + "scene_id": None, + "story_id": None, + "universe_id": str(universe_id), + "turn_id": None, + "confidence": 1.0, + "authority": Authority.SYSTEM.value, + "evidence_refs": [], + "proposed_by": "Narrator", + "status": ProposalStatus.PENDING.value, + "decision_reason": None, + "canonical_ref": None, + "decided_by": None, + "created_at": created_at, + "decided_at": None, + } + + mock_collection.find_one.return_value = existing_doc + + params = ProposedChangeUpdate( + status=ProposalStatus.ACCEPTED, + decision_reason="Valid fact", + canonical_ref=None, # Missing canonical_ref + decided_by="CanonKeeper", + ) + + # Should raise ValueError + with pytest.raises(ValueError, match="canonical_ref is required"): + mongodb_update_proposed_change(proposal_id, params) + + +@patch("monitor_data.tools.mongodb_tools.get_mongodb_client") +def test_update_not_found(mock_get_client: Mock): + """Test updating a non-existent proposed change.""" + # Setup mock + mock_client = MagicMock() + mock_collection = MagicMock() + mock_client.db = {"proposed_changes": mock_collection} + mock_get_client.return_value = mock_client + + mock_collection.find_one.return_value = None + + params = ProposedChangeUpdate( + status=ProposalStatus.ACCEPTED, + canonical_ref=uuid4(), + decided_by="CanonKeeper", + ) + + # Should raise ValueError + with pytest.raises(ValueError, match="not found"): + mongodb_update_proposed_change(uuid4(), params) From f996a42d8b4b53c6d70eb0c6f4368e8044f83253 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 28 Dec 2025 18:21:14 +0000 Subject: [PATCH 3/3] Address code review: thread-safe singleton and atomic updates Co-authored-by: spuentesp <112034353+spuentesp@users.noreply.github.com> --- .../data-layer/src/monitor_data/db/mongodb.py | 14 +++-- .../src/monitor_data/tools/mongodb_tools.py | 42 ++++++++------- .../test_tools/test_proposed_change_tools.py | 53 +++++++------------ 3 files changed, 52 insertions(+), 57 deletions(-) diff --git a/packages/data-layer/src/monitor_data/db/mongodb.py b/packages/data-layer/src/monitor_data/db/mongodb.py index c8586f9..c8a93c5 100644 --- a/packages/data-layer/src/monitor_data/db/mongodb.py +++ b/packages/data-layer/src/monitor_data/db/mongodb.py @@ -10,6 +10,7 @@ """ import os +import threading from typing import Optional from pymongo import MongoClient from pymongo.database import Database @@ -20,7 +21,8 @@ class MongoDBClient: MongoDB client for MONITOR narrative documents. Provides access to MongoDB collections with connection pooling. - Thread-safe client suitable for use across requests. + The underlying PyMongo MongoClient is thread-safe and can be used + across multiple threads/requests safely. """ def __init__( @@ -89,19 +91,23 @@ def verify_connectivity(self) -> bool: # ============================================================================= _mongodb_client: Optional[MongoDBClient] = None +_mongodb_client_lock = threading.Lock() def get_mongodb_client() -> MongoDBClient: """ - Get or create the singleton MongoDB client. + Get or create the singleton MongoDB client (thread-safe). Returns: MongoDBClient instance (connected) """ global _mongodb_client if _mongodb_client is None: - _mongodb_client = MongoDBClient() - _mongodb_client.connect() + with _mongodb_client_lock: + # Double-check pattern to avoid race condition + if _mongodb_client is None: + _mongodb_client = MongoDBClient() + _mongodb_client.connect() return _mongodb_client diff --git a/packages/data-layer/src/monitor_data/tools/mongodb_tools.py b/packages/data-layer/src/monitor_data/tools/mongodb_tools.py index 03410b5..2013458 100644 --- a/packages/data-layer/src/monitor_data/tools/mongodb_tools.py +++ b/packages/data-layer/src/monitor_data/tools/mongodb_tools.py @@ -13,6 +13,8 @@ from typing import Dict, List, Optional, Any from uuid import UUID, uuid4 +from pymongo import ReturnDocument + from monitor_data.db.mongodb import get_mongodb_client from monitor_data.schemas.proposed_changes import ( ProposedChangeCreate, @@ -207,19 +209,7 @@ def mongodb_update_proposed_change( client = get_mongodb_client() collection = client.db["proposed_changes"] - # Get existing document - document = collection.find_one({"proposal_id": str(proposal_id)}) - if not document: - raise ValueError(f"ProposedChange {proposal_id} not found") - - # Validate status transition - current_status = ProposalStatus(document["status"]) - if current_status != ProposalStatus.PENDING: - raise ValueError( - f"Cannot update ProposedChange with status '{current_status.value}'. " - "Only 'pending' proposals can be updated to 'accepted' or 'rejected'." - ) - + # Validate status transition rules if params.status == ProposalStatus.PENDING: raise ValueError( "Cannot transition to 'pending' status. " @@ -233,7 +223,7 @@ def mongodb_update_proposed_change( "It must reference the created Neo4j node/edge ID." ) - # Build update + # Build update atomically using find_one_and_update with status validation decided_at = datetime.now(timezone.utc) update_doc = { "$set": { @@ -245,16 +235,28 @@ def mongodb_update_proposed_change( } } - # Update document - collection.update_one( - {"proposal_id": str(proposal_id)}, + # Use find_one_and_update for atomic operation with status validation + # Only update if current status is PENDING + updated_doc = collection.find_one_and_update( + { + "proposal_id": str(proposal_id), + "status": ProposalStatus.PENDING.value, # Only update pending proposals + }, update_doc, + return_document=ReturnDocument.AFTER, # Return document after update ) - # Get updated document - updated_doc = collection.find_one({"proposal_id": str(proposal_id)}) if not updated_doc: - raise ValueError(f"ProposedChange {proposal_id} not found after update") + # Either proposal doesn't exist or status is not PENDING + existing = collection.find_one({"proposal_id": str(proposal_id)}) + if not existing: + raise ValueError(f"ProposedChange {proposal_id} not found") + + current_status = ProposalStatus(existing["status"]) + raise ValueError( + f"Cannot update ProposedChange with status '{current_status.value}'. " + "Only 'pending' proposals can be updated to 'accepted' or 'rejected'." + ) return _document_to_response(updated_doc) diff --git a/packages/data-layer/tests/test_tools/test_proposed_change_tools.py b/packages/data-layer/tests/test_tools/test_proposed_change_tools.py index 00960f0..09f527f 100644 --- a/packages/data-layer/tests/test_tools/test_proposed_change_tools.py +++ b/packages/data-layer/tests/test_tools/test_proposed_change_tools.py @@ -398,9 +398,10 @@ def test_update_accept(mock_get_client: Mock): universe_id = uuid4() canonical_ref = uuid4() created_at = datetime.now(timezone.utc) + decided_at = datetime.now(timezone.utc) - # Mock existing document (pending) - existing_doc = { + # Mock updated document (accepted) returned by find_one_and_update + updated_doc = { "proposal_id": str(proposal_id), "change_type": ProposalType.FACT.value, "content": {"statement": "Test fact"}, @@ -412,27 +413,16 @@ def test_update_accept(mock_get_client: Mock): "authority": Authority.SYSTEM.value, "evidence_refs": [], "proposed_by": "Narrator", - "status": ProposalStatus.PENDING.value, - "decision_reason": None, - "canonical_ref": None, - "decided_by": None, - "created_at": created_at, - "decided_at": None, - } - - # Mock updated document (accepted) - decided_at = datetime.now(timezone.utc) - updated_doc = { - **existing_doc, "status": ProposalStatus.ACCEPTED.value, "decision_reason": "Valid fact", "canonical_ref": str(canonical_ref), "decided_by": "CanonKeeper", + "created_at": created_at, "decided_at": decided_at, } - # Setup mock to return existing doc first, then updated doc - mock_collection.find_one.side_effect = [existing_doc, updated_doc] + # Mock find_one_and_update to return updated document + mock_collection.find_one_and_update.return_value = updated_doc params = ProposedChangeUpdate( status=ProposalStatus.ACCEPTED, @@ -448,7 +438,7 @@ def test_update_accept(mock_get_client: Mock): assert result.decision_reason == "Valid fact" assert result.canonical_ref == canonical_ref assert result.decided_by == "CanonKeeper" - assert mock_collection.update_one.call_count == 1 + assert mock_collection.find_one_and_update.call_count == 1 @patch("monitor_data.tools.mongodb_tools.get_mongodb_client") @@ -463,9 +453,10 @@ def test_update_reject(mock_get_client: Mock): proposal_id = uuid4() universe_id = uuid4() created_at = datetime.now(timezone.utc) + decided_at = datetime.now(timezone.utc) - # Mock existing document (pending) - existing_doc = { + # Mock updated document (rejected) returned by find_one_and_update + updated_doc = { "proposal_id": str(proposal_id), "change_type": ProposalType.FACT.value, "content": {"statement": "Test fact"}, @@ -477,25 +468,15 @@ def test_update_reject(mock_get_client: Mock): "authority": Authority.SYSTEM.value, "evidence_refs": [], "proposed_by": "Narrator", - "status": ProposalStatus.PENDING.value, - "decision_reason": None, - "canonical_ref": None, - "decided_by": None, - "created_at": created_at, - "decided_at": None, - } - - # Mock updated document (rejected) - decided_at = datetime.now(timezone.utc) - updated_doc = { - **existing_doc, "status": ProposalStatus.REJECTED.value, "decision_reason": "Contradicts existing canon", + "canonical_ref": None, "decided_by": "CanonKeeper", + "created_at": created_at, "decided_at": decided_at, } - mock_collection.find_one.side_effect = [existing_doc, updated_doc] + mock_collection.find_one_and_update.return_value = updated_doc params = ProposedChangeUpdate( status=ProposalStatus.REJECTED, @@ -509,7 +490,7 @@ def test_update_reject(mock_get_client: Mock): assert result.status == ProposalStatus.REJECTED assert result.decision_reason == "Contradicts existing canon" assert result.canonical_ref is None - assert mock_collection.update_one.call_count == 1 + assert mock_collection.find_one_and_update.call_count == 1 @patch("monitor_data.tools.mongodb_tools.get_mongodb_client") @@ -546,6 +527,9 @@ def test_update_invalid_transition(mock_get_client: Mock): "decided_at": created_at, } + # find_one_and_update returns None (no matching document with pending status) + mock_collection.find_one_and_update.return_value = None + # find_one returns the existing accepted document mock_collection.find_one.return_value = existing_doc params = ProposedChangeUpdate( @@ -663,6 +647,9 @@ def test_update_not_found(mock_get_client: Mock): mock_client.db = {"proposed_changes": mock_collection} mock_get_client.return_value = mock_client + # find_one_and_update returns None (no document found) + mock_collection.find_one_and_update.return_value = None + # find_one also returns None (document doesn't exist) mock_collection.find_one.return_value = None params = ProposedChangeUpdate(