Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions packages/data-layer/src/monitor_data/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@
"""

from monitor_data.db.neo4j import Neo4jClient, get_neo4j_client
from monitor_data.db.mongodb import MongoDBClient, get_mongodb_client

# from monitor_data.db.mongodb import MongoDBClient
# from monitor_data.db.qdrant import QdrantClient
# from monitor_data.db.minio import MinIOClient

__all__ = [
"Neo4jClient",
"get_neo4j_client",
# "MongoDBClient",
"MongoDBClient",
"get_mongodb_client",
# "QdrantClient",
# "MinIOClient",
]
119 changes: 119 additions & 0 deletions packages/data-layer/src/monitor_data/db/mongodb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""
MongoDB client for MONITOR Data Layer.

LAYER: 1 (data-layer)
IMPORTS FROM: External libraries only
CALLED BY: mongodb_tools.py

This provides a thin wrapper around pymongo for narrative document storage.
Collections: scenes, turns, proposed_changes, resolutions, memories, etc.
"""

import os
import threading
from typing import Optional
from pymongo import MongoClient
from pymongo.database import Database


class MongoDBClient:
"""
MongoDB client for MONITOR narrative documents.

Provides access to MongoDB collections with connection pooling.
The underlying PyMongo MongoClient is thread-safe and can be used
across multiple threads/requests safely.
"""

def __init__(
self,
uri: Optional[str] = None,
database: str = "monitor",
):
"""
Initialize MongoDB client.

Args:
uri: MongoDB connection URI (defaults to MONGODB_URI env var)
database: Database name (defaults to "monitor")
"""
self.uri = uri or os.getenv("MONGODB_URI", "mongodb://localhost:27017")
self.database_name = database
self._client: Optional[MongoClient] = None
self._db: Optional[Database] = None

def connect(self) -> None:
"""Establish connection to MongoDB."""
if self._client is None:
self._client = MongoClient(self.uri)
self._db = self._client[self.database_name]

def close(self) -> None:
"""Close MongoDB connection."""
if self._client:
self._client.close()
self._client = None
self._db = None

@property
def db(self) -> Database:
"""
Get the MongoDB database instance.

Returns:
MongoDB Database object

Raises:
RuntimeError: If not connected
"""
if self._db is None:
raise RuntimeError("MongoDB client not connected. Call connect() first.")
return self._db

def verify_connectivity(self) -> bool:
"""
Verify MongoDB connection is working.

Returns:
True if connection is healthy
"""
try:
if self._client:
self._client.admin.command("ping")
return True
return False
except Exception:
return False


# =============================================================================
# SINGLETON CLIENT
# =============================================================================

_mongodb_client: Optional[MongoDBClient] = None
_mongodb_client_lock = threading.Lock()


def get_mongodb_client() -> MongoDBClient:
"""
Get or create the singleton MongoDB client (thread-safe).

Returns:
MongoDBClient instance (connected)
"""
global _mongodb_client
if _mongodb_client is None:
with _mongodb_client_lock:
# Double-check pattern to avoid race condition
if _mongodb_client is None:
_mongodb_client = MongoDBClient()
_mongodb_client.connect()
return _mongodb_client


def close_mongodb_client() -> None:
"""Close the singleton MongoDB client."""
global _mongodb_client
if _mongodb_client:
_mongodb_client.close()
_mongodb_client = None
7 changes: 6 additions & 1 deletion packages/data-layer/src/monitor_data/middleware/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,13 @@
"mongodb_get_turns": ["*"],
"mongodb_undo_turn": ["Orchestrator"],
# =========================================================================
# MONGODB OPERATIONS - Proposals
# MONGODB OPERATIONS - Proposals (DL-5)
# =========================================================================
"mongodb_create_proposed_change": ["*"],
"mongodb_get_proposed_change": ["*"],
"mongodb_list_proposed_changes": ["*"],
"mongodb_update_proposed_change": ["CanonKeeper"],
# Legacy proposal operations (deprecated, use proposed_change instead)
"mongodb_create_proposal": ["Narrator", "Resolver", "CanonKeeper"],
"mongodb_get_proposals": ["*"],
"mongodb_update_proposal": ["CanonKeeper"],
Expand Down
165 changes: 165 additions & 0 deletions packages/data-layer/src/monitor_data/schemas/proposed_changes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
"""
Pydantic schemas for ProposedChange operations in MongoDB.

LAYER: 1 (data-layer)
IMPORTS FROM: External libraries (pydantic, uuid, datetime) and base schemas
CALLED BY: mongodb_tools.py

These schemas define the data contracts for ProposedChange CRUD operations.
ProposedChanges are staging documents that CanonKeeper evaluates at scene end.
"""

from datetime import datetime
from typing import Optional, List, Dict, Any
from uuid import UUID

from pydantic import BaseModel, Field

from monitor_data.schemas.base import Authority, ProposalStatus, ProposalType


# =============================================================================
# PROPOSED CHANGE SCHEMAS
# =============================================================================


class EvidenceRef(BaseModel):
"""Reference to supporting evidence for a proposed change."""

type: str = Field(
description="Type of evidence: turn, snippet, source, fact, rule"
)
ref_id: UUID = Field(description="UUID of the referenced document/node")


class ProposedChangeCreate(BaseModel):
"""Request to create a ProposedChange document."""

change_type: ProposalType = Field(
description="Type of change being proposed"
)
content: Dict[str, Any] = Field(
description="Flexible JSON payload specific to change_type"
)
scene_id: Optional[UUID] = Field(
None,
description="Scene this change was proposed in (optional for system/ingest)",
)
story_id: Optional[UUID] = Field(
None,
description="Story this change relates to",
)
universe_id: UUID = Field(
description="Universe this change affects"
)
turn_id: Optional[UUID] = Field(
None,
description="Turn that proposed this change (if applicable)",
)
confidence: float = Field(
ge=0.0,
le=1.0,
default=1.0,
description="Confidence level in this proposal",
)
authority: Authority = Field(
default=Authority.SYSTEM,
description="Who is proposing this change",
)
evidence_refs: List[EvidenceRef] = Field(
default_factory=list,
description="Supporting evidence for this proposal",
)
proposed_by: str = Field(
description="Agent or system component that proposed this change"
)


class ProposedChangeUpdate(BaseModel):
"""Request to update a ProposedChange (status transitions)."""

status: ProposalStatus = Field(
description="New status: accepted or rejected"
)
decision_reason: Optional[str] = Field(
None,
max_length=2000,
description="Rationale for accepting/rejecting",
)
canonical_ref: Optional[UUID] = Field(
None,
description="If accepted, the Neo4j node/edge ID that was created",
)
decided_by: str = Field(
default="CanonKeeper",
description="Agent that made the decision (usually CanonKeeper)",
)


class ProposedChangeResponse(BaseModel):
"""Response with ProposedChange document data."""

proposal_id: UUID
change_type: ProposalType
content: Dict[str, Any]
scene_id: Optional[UUID]
story_id: Optional[UUID]
universe_id: UUID
turn_id: Optional[UUID]
confidence: float
authority: Authority
evidence_refs: List[EvidenceRef]
proposed_by: str
status: ProposalStatus
decision_reason: Optional[str] = None
canonical_ref: Optional[UUID] = None
decided_by: Optional[str] = None
created_at: datetime
decided_at: Optional[datetime] = None

model_config = {"from_attributes": True}


class ProposedChangeFilter(BaseModel):
"""Filter parameters for listing proposed changes."""

scene_id: Optional[UUID] = Field(
None,
description="Filter by scene ID",
)
story_id: Optional[UUID] = Field(
None,
description="Filter by story ID",
)
universe_id: Optional[UUID] = Field(
None,
description="Filter by universe ID",
)
status: Optional[ProposalStatus] = Field(
None,
description="Filter by status",
)
change_type: Optional[ProposalType] = Field(
None,
description="Filter by change type",
)
limit: int = Field(
default=50,
ge=1,
le=1000,
description="Maximum number of results",
)
offset: int = Field(
default=0,
ge=0,
description="Number of results to skip",
)


class ProposedChangeListResponse(BaseModel):
"""Response with list of proposed changes and pagination info."""

proposals: List[ProposedChangeResponse]
total: int
limit: int
offset: int
Loading