Conversation
Addresses two concrete bottlenecks from the architecture audit: 1. EntityReader.filter_defined_entities used to pull every node and every edge of a graph into Python memory and filter there. For graphs with more than a few thousand entities this caused large memory spikes and latency. The filtering, type whitelist check, and adjacency lookup now run inside Neo4j via a new GraphStorage.get_filtered_entities_with_edges method. EntityReader only assembles EntityNode dataclasses from the returned rows. 2. GraphMemoryUpdater held an unbounded queue of agent activities. When Neo4j ingestion was slower than OASIS event generation the queue grew without limit and could OOM the backend. The queue is now bounded via GRAPH_MEMORY_QUEUE_MAX (default 10000) with a put-side timeout (GRAPH_MEMORY_PUT_TIMEOUT) that applies backpressure and, on overflow, drops events with a dropped_count stat instead of crashing. Public contracts are preserved: FilteredEntities keeps the same shape and callers in simulation_entities / simulation_prepare / simulation_history / simulation_manager do not need changes. GraphMemoryUpdater's constructor keeps backwards compatibility via default parameters, and max_queue_size=0 still gives the old unbounded behaviour as an opt-in. Tests: 14 new tests covering EntityReader delegation, Neo4jStorage dict assembly (direction logic, dedup, type whitelist as parameter, edgeless path), and the bounded queue (drop on overflow, DO_NOTHING skip path, unbounded opt-in, stats surface). Quality gate: 102/102 backend tests green, scoped ruff clean, frontend lint + build clean. https://claude.ai/code/session_01CiS1Gg8J8YBkRy3S2QJvPi
There was a problem hiding this comment.
Code Review
This pull request introduces a Cypher-pushdown optimization for entity filtering and adjacency fetching, delegating these operations to Neo4j to reduce memory consumption in the backend. It also implements a bounded queue with backpressure in the GraphMemoryUpdater to protect against OOM crashes during high-load simulations. The updates include new configuration options, storage interface extensions, and a suite of unit tests. Reviewer feedback suggests adopting f-strings for logging to maintain stylistic consistency with the rest of the project.
| logger.info( | ||
| "GraphMemoryUpdater initialized: graph_id=%s, batch_size=%s, " | ||
| "queue_max=%s, put_timeout=%ss", | ||
| graph_id, | ||
| self.BATCH_SIZE, | ||
| self._max_queue_size or "unbounded", | ||
| self._put_timeout, | ||
| ) |
There was a problem hiding this comment.
Zur Verbesserung der Konsistenz im Code-Stil würde ich vorschlagen, hier f-Strings für das Logging zu verwenden, so wie es auch in anderen Teilen dieser Datei (stop()-Methode) und des Projekts der Fall ist.
Die Verwendung von %-Formatierung ist zwar für das Logging nicht falsch, aber die Mischung verschiedener Stile kann die Lesbarkeit und Wartbarkeit erschweren.
logger.info(
f"GraphMemoryUpdater initialized: graph_id={graph_id}, batch_size={self.BATCH_SIZE}, "
f"queue_max={self._max_queue_size or 'unbounded'}, put_timeout={self._put_timeout}s"
)| logger.error( | ||
| "Activity queue full for graph %s (maxsize=%s) — dropping " | ||
| "event: %s by %s", | ||
| self.graph_id, | ||
| self._max_queue_size, | ||
| activity.action_type, | ||
| activity.agent_name, | ||
| ) |
There was a problem hiding this comment.
Auch hier wird die %-Formatierung für das Logging verwendet, während an anderen Stellen f-Strings genutzt werden. Um die Konsistenz im Code zu wahren, empfehle ich, auch diesen Log-Aufruf auf f-Strings umzustellen.
logger.error(
f"Activity queue full for graph {self.graph_id} (maxsize={self._max_queue_size}) — dropping "
f"event: {activity.action_type} by {activity.agent_name}"
)
Summary
Setzt die zwei konkret umrissenen Refactorings aus dem Architektur-Audit um:
EntityReader— stattget_all_nodes + get_all_edgesim RAM zu filtern, läuft Filterung + Adjazenz jetzt als Cypher-Query in Neo4j. Neue Storage-MethodeGraphStorage.get_filtered_entities_with_edges. Kontrakt vonFilteredEntitiesbleibt identisch; die vier bestehenden Call-Sites (simulation_entities,simulation_prepare,simulation_history,simulation_manager) brauchen keine Änderung.GraphMemoryUpdater— unbegrenzteQueue()ersetzt durchQueue(maxsize=GRAPH_MEMORY_QUEUE_MAX)mitput(timeout=GRAPH_MEMORY_PUT_TIMEOUT). Bei Überlauf wird das Event verworfen (neuerdropped_countin den Stats), statt OOM zu triggern.max_queue_size=0behält die alte unbounded-Semantik als Opt-in.Die großen Architektur-Vorschläge aus dem Audit (Event-Bus-IPC via Redis, Temporal Graph Evolution, Dynamic Ontology Mutation, Multi-Agent Consensus Metrics) sind als separate GitHub-Issues eingetragen — siehe unten in der Issue-Liste.
Konkrete Änderungen
backend/app/storage/graph_storage.py— neue abstrakte Methodeget_filtered_entities_with_edgesbackend/app/storage/neo4j_storage.py— Cypher-Implementierung mit zwei Varianten (mit/ohne Edge-Enrichment) und defensivem Post-Processing (Direction-Logic, Dedup, Typ-Whitelist als Parameter, nicht als Interpolation)backend/app/services/entity_reader.py— delegiert jetzt an die Storage-Methodebackend/app/services/graph_memory_updater.py— bounded Queue,dropped_count, Config-Anbindungbackend/app/config.py—GRAPH_MEMORY_QUEUE_MAX(10000),GRAPH_MEMORY_PUT_TIMEOUT(2.0)package.json—lint:backend-Scope um refaktorierte Dateien und neue Tests erweitertCHANGELOG.md— Eintrag unter v0.4.1test_entity_reader.py,test_graph_memory_updater.py,test_neo4j_filtered_entities.pyTest plan
npm run test:backend— 102/102 grünnpm run lint:backend(gescopter Rollout, inkl. neuer Dateien) — cleannpm run lint:frontend— cleannpm run build:frontend— cleanuv run python -m compileall app scripts— cleanAusstehend vor Merge (benötigt laufendes Neo4j): Live-Probe des neuen Cypher-Queries gegen einen real gebauten Graphen, inkl. Messung der Memory/Latency-Verbesserung auf einem Graphen mit >1k Entitäten.
https://claude.ai/code/session_01CiS1Gg8J8YBkRy3S2QJvPi
Generated by Claude Code