Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions packages/data-layer/src/monitor_data/db/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,16 @@
"""

from monitor_data.db.neo4j import Neo4jClient, get_neo4j_client
from monitor_data.db.mongodb import MongoDBClient, get_mongodb_client

# from monitor_data.db.mongodb import MongoDBClient
# from monitor_data.db.qdrant import QdrantClient
# from monitor_data.db.minio import MinIOClient

__all__ = [
"Neo4jClient",
"get_neo4j_client",
# "MongoDBClient",
"MongoDBClient",
"get_mongodb_client",
# "QdrantClient",
# "MinIOClient",
]
194 changes: 194 additions & 0 deletions packages/data-layer/src/monitor_data/db/mongodb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
"""
MongoDB client for MONITOR Data Layer.

LAYER: 1 (data-layer)
IMPORTS FROM: External libraries only (pymongo)
CALLED BY: mongodb_tools.py

This client provides a thin wrapper around pymongo for:
- Scenes: Narrative episodes and their turns
- ProposedChanges: Staging area for canonization
- Memories: Agent and character memories
- Documents: Ingested source documents

Collections:
- scenes: Scene documents with embedded turns
- proposed_changes: Proposed changes awaiting canonization
- memories: Character and agent memories
- documents: Source documents and metadata
- snippets: Document snippets with embeddings
"""

import os
from typing import Optional
from pymongo import MongoClient, ASCENDING, DESCENDING
from pymongo.database import Database
from pymongo.collection import Collection


class MongoDBClient:
"""
MongoDB client for MONITOR narrative and document storage.

Thread-safe singleton client for MongoDB operations.
Manages connection lifecycle and provides collection access.
"""

def __init__(
self,
uri: Optional[str] = None,
database: Optional[str] = None,
):
"""
Initialize MongoDB client.

Args:
uri: MongoDB connection URI (default: from MONGODB_URI env var)
database: Database name (default: from MONGODB_DATABASE env var or "monitor")
"""
self.uri = uri or os.getenv("MONGODB_URI", "mongodb://localhost:27017")
self.database_name = database or os.getenv("MONGODB_DATABASE", "monitor")
self._client = None
self._db = None
self._indexes_created = False

def connect(self) -> None:
"""
Establish connection to MongoDB.

Creates indexes for all collections on first connection.
"""
if self._client is None:
self._client = MongoClient(self.uri)
self._db = self._client[self.database_name]
if not self._indexes_created:
self._create_indexes()
self._indexes_created = True

def close(self) -> None:
"""Close MongoDB connection."""
if self._client:
self._client.close()
self._client = None
self._db = None
self._indexes_created = False

def verify_connectivity(self) -> bool:
"""
Verify MongoDB connection is working.

Returns:
True if connection is healthy, False otherwise
"""
try:
if self._client is None:
self.connect()
# Ping the server
self._client.admin.command("ping")
return True
except Exception:
return False

def get_database(self) -> Database:
"""
Get the MongoDB database object.

Returns:
pymongo Database object

Raises:
RuntimeError: If not connected
"""
if self._db is None:
raise RuntimeError("MongoDB client not connected. Call connect() first.")
return self._db

def get_collection(self, name: str) -> Collection:
"""
Get a collection by name.

Args:
name: Collection name

Returns:
pymongo Collection object
"""
db = self.get_database()
return db[name]

def _create_indexes(self) -> None:
"""
Create indexes for all collections.

Called automatically on first connection.
"""
if self._db is None:
return

# Scenes collection indexes
scenes = self._db["scenes"]
scenes.create_index([("scene_id", ASCENDING)], unique=True)
scenes.create_index([("story_id", ASCENDING), ("order", ASCENDING)])
scenes.create_index([("status", ASCENDING)])
scenes.create_index([("created_at", DESCENDING)])

# Proposed changes collection indexes
proposed_changes = self._db["proposed_changes"]
proposed_changes.create_index([("proposal_id", ASCENDING)], unique=True)
proposed_changes.create_index([("scene_id", ASCENDING), ("status", ASCENDING)])
proposed_changes.create_index([("status", ASCENDING)])

# Memories collection indexes
memories = self._db["memories"]
memories.create_index([("memory_id", ASCENDING)], unique=True)
memories.create_index([("entity_id", ASCENDING)])
memories.create_index([("created_at", DESCENDING)])

# Documents collection indexes
documents = self._db["documents"]
documents.create_index([("doc_id", ASCENDING)], unique=True)
documents.create_index([("universe_id", ASCENDING)])
documents.create_index([("status", ASCENDING)])

# Snippets collection indexes
snippets = self._db["snippets"]
snippets.create_index([("snippet_id", ASCENDING)], unique=True)
snippets.create_index([("doc_id", ASCENDING)])


# =============================================================================
# SINGLETON ACCESS
# =============================================================================

_mongodb_client_instance: Optional[MongoDBClient] = None


def get_mongodb_client() -> MongoDBClient:
"""
Get or create the singleton MongoDB client.

Returns:
MongoDBClient instance

Thread-safe singleton pattern for database connections.
"""
global _mongodb_client_instance

if _mongodb_client_instance is None:
_mongodb_client_instance = MongoDBClient()
_mongodb_client_instance.connect()

return _mongodb_client_instance


def reset_mongodb_client() -> None:
"""
Reset the MongoDB client singleton.

Used for testing to ensure clean state between tests.
"""
global _mongodb_client_instance

if _mongodb_client_instance is not None:
_mongodb_client_instance.close()
_mongodb_client_instance = None
6 changes: 3 additions & 3 deletions packages/data-layer/src/monitor_data/middleware/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,14 +89,14 @@
# =========================================================================
# MONGODB OPERATIONS - Scenes
# =========================================================================
"mongodb_create_scene": ["Orchestrator"],
"mongodb_create_scene": ["CanonKeeper", "Narrator"],
"mongodb_get_scene": ["*"],
"mongodb_update_scene": ["Orchestrator"],
"mongodb_update_scene": ["CanonKeeper", "Narrator"],
"mongodb_list_scenes": ["*"],
# =========================================================================
# MONGODB OPERATIONS - Turns
# =========================================================================
"mongodb_append_turn": ["Narrator", "Orchestrator"],
"mongodb_append_turn": ["*"],
"mongodb_get_turns": ["*"],
"mongodb_undo_turn": ["Orchestrator"],
# =========================================================================
Expand Down
141 changes: 141 additions & 0 deletions packages/data-layer/src/monitor_data/schemas/scenes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
"""
Pydantic schemas for Scene and Turn operations (MongoDB).

LAYER: 1 (data-layer)
IMPORTS FROM: External libraries (pydantic, uuid, datetime) and base schemas
CALLED BY: mongodb_tools.py

These schemas define the data contracts for Scene and Turn CRUD operations.
Scenes are narrative episodes stored in MongoDB for flexibility.
Turns are individual exchanges within scenes.
"""

from datetime import datetime
from typing import Optional, List
from uuid import UUID

from pydantic import BaseModel, Field, field_validator

from monitor_data.schemas.base import SceneStatus, Speaker


# =============================================================================
# TURN SCHEMAS
# =============================================================================


class TurnCreate(BaseModel):
"""Request to create a Turn (append to scene)."""

speaker: Speaker
entity_id: Optional[UUID] = Field(
None, description="Entity ID if speaker is entity"
)
text: str = Field(min_length=1, max_length=10000)

@field_validator("entity_id")
@classmethod
def validate_entity_speaker(cls, v: Optional[UUID], info) -> Optional[UUID]:
"""Validate that entity_id is provided when speaker is entity."""
if info.data.get("speaker") == Speaker.ENTITY and v is None:
raise ValueError("entity_id required when speaker is entity")
return v


class TurnResponse(BaseModel):
"""Response with Turn data."""

turn_id: UUID
speaker: Speaker
entity_id: Optional[UUID] = None
text: str
timestamp: datetime
resolution_ref: Optional[UUID] = Field(
None, description="Reference to resolution document"
)

model_config = {"from_attributes": True}


# =============================================================================
# SCENE SCHEMAS
# =============================================================================


class SceneCreate(BaseModel):
"""Request to create a Scene."""

story_id: UUID
universe_id: UUID
title: str = Field(min_length=1, max_length=200)
purpose: str = Field(
default="", max_length=1000, description="Scene purpose or goal"
)
order: Optional[int] = Field(None, ge=0, description="Scene order in story")
location_ref: Optional[UUID] = Field(
None, description="EntityInstance ID for location"
)
participating_entities: List[UUID] = Field(
default_factory=list, description="EntityInstance IDs of participants"
)
status: SceneStatus = Field(default=SceneStatus.ACTIVE)


class SceneUpdate(BaseModel):
"""Request to update a Scene.

Enforces valid status transitions.
"""

title: Optional[str] = Field(None, min_length=1, max_length=200)
purpose: Optional[str] = Field(None, max_length=1000)
status: Optional[SceneStatus] = None
summary: Optional[str] = Field(None, max_length=5000, description="Scene summary")


class SceneResponse(BaseModel):
"""Response with Scene data."""

scene_id: UUID
story_id: UUID
universe_id: UUID
title: str
purpose: str
status: SceneStatus
order: Optional[int] = None
location_ref: Optional[UUID] = None
participating_entities: List[UUID] = Field(default_factory=list)
turns: List[TurnResponse] = Field(default_factory=list)
proposed_changes: List[UUID] = Field(default_factory=list)
canonical_outcomes: List[UUID] = Field(default_factory=list)
summary: str = Field(default="")
created_at: datetime
updated_at: datetime
completed_at: Optional[datetime] = None

model_config = {"from_attributes": True}


class SceneFilter(BaseModel):
"""Filter parameters for listing scenes."""

story_id: Optional[UUID] = None
universe_id: Optional[UUID] = None
status: Optional[SceneStatus] = None
limit: int = Field(default=50, ge=1, le=1000)
offset: int = Field(default=0, ge=0)
sort_by: str = Field(
default="created_at", description="Field to sort by: created_at, order"
)
sort_order: str = Field(
default="desc", description="Sort order: asc, desc", pattern="^(asc|desc)$"
)


class SceneListResponse(BaseModel):
"""Response with list of scenes and pagination info."""

scenes: List[SceneResponse]
total: int
limit: int
offset: int
Loading