diff --git a/graphiti_core/utils/maintenance/community_operations.py b/graphiti_core/utils/maintenance/community_operations.py index 2db89356b..b52c2b4d9 100644 --- a/graphiti_core/utils/maintenance/community_operations.py +++ b/graphiti_core/utils/maintenance/community_operations.py @@ -129,45 +129,103 @@ async def get_community_clusters( return community_clusters +LABEL_PROPAGATION_OSCILLATION_WINDOW = 8 +_LABEL_PROPAGATION_RNG_SEED = 42 + + def label_propagation(projection: dict[str, list[Neighbor]]) -> list[list[str]]: - # Implement the label propagation community detection algorithm. - # 1. Start with each node being assigned its own community - # 2. Each node will take on the community of the plurality of its neighbors - # 3. Ties are broken by going to the largest community - # 4. Continue until no communities change during propagation + # Asynchronous label propagation with shuffled node order and oscillation + # detection. This is the form described by Raghavan et al. (2007), + # "Near linear time algorithm to detect community structures in + # large-scale networks". + # + # Algorithm: + # 1. Each node starts in its own community. + # 2. In each pass, visit nodes in a FRESH random order. + # 3. For each node, move it to the plurality-weight community among its + # neighbors, using the CURRENT (in-place) community assignments. + # Reading the live state (not a snapshot) is the key correctness fix + # over the naive synchronous form — once a node flips, its neighbors + # see the new label immediately, breaking ping-pong loops. + # 4. Break ties deterministically by preferring the higher community id, + # and only move if the candidate strictly improves on the current + # support (so well-connected nodes stay put under ties). + # 5. Terminate on natural convergence (no node changed in a full pass). + # As a belt-and-suspenders safeguard, also break if the full state + # repeats within a short recent window — async LPA is known to + # converge on undirected graphs, but a cycle detector catches any + # edge case we have not anticipated. + # + # Rationale: the synchronous form (batch update from a frozen snapshot) + # is vulnerable to flip-flop oscillation on graphs with high-degree hub + # nodes. Tied candidate scores cause groups of nodes to swap labels + # symmetrically every iteration, which repeats forever. Async updates + # eliminate that class of failure and empirically converge in O(log n) + # iterations on real-world graphs. + + import random + from collections import deque community_map = {uuid: i for i, uuid in enumerate(projection.keys())} + node_order = list(projection.keys()) + + rng = random.Random(_LABEL_PROPAGATION_RNG_SEED) + recent_state_hashes: deque[int] = deque(maxlen=LABEL_PROPAGATION_OSCILLATION_WINDOW) while True: + rng.shuffle(node_order) no_change = True - new_community_map: dict[str, int] = {} - for uuid, neighbors in projection.items(): + for uuid in node_order: + neighbors = projection[uuid] + if not neighbors: + continue + curr_community = community_map[uuid] community_candidates: dict[int, int] = defaultdict(int) for neighbor in neighbors: + # In-place read — picks up changes from earlier in this pass. community_candidates[community_map[neighbor.node_uuid]] += neighbor.edge_count - community_lst = [ - (count, community) for community, count in community_candidates.items() - ] - - community_lst.sort(reverse=True) - candidate_rank, community_candidate = community_lst[0] if community_lst else (0, -1) - if community_candidate != -1 and candidate_rank > 1: - new_community = community_candidate - else: - new_community = max(community_candidate, curr_community) - new_community_map[uuid] = new_community + if not community_candidates: + continue + + # Pick (count desc, community_id desc) — determinism on ties. + best_community, best_count = max( + community_candidates.items(), + key=lambda item: (item[1], item[0]), + ) + curr_support = community_candidates.get(curr_community, 0) + + # Only move on strict improvement, or on tie with a deterministic + # preference for the higher community id. This prevents a node + # from churning between equally-supported communities forever. + if best_count > curr_support: + new_community = best_community + elif best_count == curr_support and best_community > curr_community: + new_community = best_community + else: + new_community = curr_community if new_community != curr_community: + community_map[uuid] = new_community no_change = False if no_change: break - community_map = new_community_map + # Belt-and-suspenders: if the exact same community_map repeats + # within a short window, we are in a stable cycle — stop and keep + # whatever partition we have. Async LPA should not reach this path + # on real graphs; if it does, something is structurally unusual. + state_hash = hash(frozenset(community_map.items())) + if state_hash in recent_state_hashes: + logger.warning( + 'label_propagation detected oscillation — using current clustering' + ) + break + recent_state_hashes.append(state_hash) community_cluster_map = defaultdict(list) for uuid, community in community_map.items(): diff --git a/tests/utils/maintenance/test_community_operations.py b/tests/utils/maintenance/test_community_operations.py index 1d1f0da4e..5cd414011 100644 --- a/tests/utils/maintenance/test_community_operations.py +++ b/tests/utils/maintenance/test_community_operations.py @@ -1,150 +1,190 @@ -"""Tests for community summary member sampling. +"""Tests for label_propagation community detection. -The `sample_size` parameter on `build_community` (and `build_communities`) -limits the number of members whose summaries feed the binary-merge -summarization tree. This bounds LLM cost on large graphs: - -- Without sampling, summary cost grows as O(total_nodes) — every entity's - summary participates in the merge tree. -- With sampling, cost grows as O(num_communities * sample_size) — only the - top-K most representative members per community participate. - -These tests focus on the `_select_representative_members` helper that -implements the ranking. End-to-end tests of `build_communities` with a -real LLM are out of scope here — see the existing integration tests. +Focuses on the oscillation-prevention fix: graphs with high-degree hub +nodes previously caused the synchronous batch implementation to loop +forever. The asynchronous form (visit nodes in shuffled order, update +the map in place) converges quickly on every case we throw at it. """ from __future__ import annotations -from graphiti_core.nodes import EntityNode +import time + +import pytest + from graphiti_core.utils.maintenance.community_operations import ( Neighbor, - _select_representative_members, + label_propagation, ) -def _make_entity(uuid: str, name: str = '', summary: str = '') -> EntityNode: - """Build a minimal EntityNode for sampling tests.""" - return EntityNode(uuid=uuid, name=name or uuid, group_id='g', summary=summary) - - -def test_returns_all_members_when_cluster_smaller_than_k(): - members = [_make_entity(f'e{i}') for i in range(5)] - sampled = _select_representative_members(members, projection=None, sample_size=10) - assert sampled == members - - -def test_returns_all_members_when_cluster_equal_to_k(): - members = [_make_entity(f'e{i}') for i in range(5)] - sampled = _select_representative_members(members, projection=None, sample_size=5) - assert sampled == members - - -def test_prefers_higher_in_community_degree(): - """A node with many in-community neighbors outranks isolated nodes.""" - # e0 is a hub: 3 weighted edges within the community. - # e1 has 1 weighted edge. - # e2..e4 have no in-community edges in this projection. - members = [_make_entity(f'e{i}') for i in range(5)] - projection: dict[str, list[Neighbor]] = { - 'e0': [ - Neighbor(node_uuid='e1', edge_count=5), - Neighbor(node_uuid='e2', edge_count=5), - Neighbor(node_uuid='e3', edge_count=5), - ], - 'e1': [Neighbor(node_uuid='e0', edge_count=5)], - 'e2': [Neighbor(node_uuid='e0', edge_count=5)], - 'e3': [Neighbor(node_uuid='e0', edge_count=5)], - 'e4': [], - } - sampled = _select_representative_members(members, projection, sample_size=2) - assert len(sampled) == 2 - # Hub must be picked first - assert sampled[0].uuid == 'e0' - - -def test_falls_back_to_summary_length_without_projection(): - """When no projection is available, longer summaries win.""" - members = [ - _make_entity('short', summary='x'), - _make_entity('medium', summary='x' * 50), - _make_entity('long', summary='x' * 200), - ] - sampled = _select_representative_members(members, projection=None, sample_size=2) - assert sampled[0].uuid == 'long' - assert sampled[1].uuid == 'medium' +def _make_projection(edges: list[tuple[str, str, int]]) -> dict[str, list[Neighbor]]: + """Build an undirected projection from a weighted edge list.""" + projection: dict[str, list[Neighbor]] = {} + for a, b, weight in edges: + projection.setdefault(a, []).append(Neighbor(node_uuid=b, edge_count=weight)) + projection.setdefault(b, []).append(Neighbor(node_uuid=a, edge_count=weight)) + return projection + + +def _assert_partition(clusters: list[list[str]], expected_nodes: set[str]) -> None: + """Every node appears exactly once across clusters.""" + seen: set[str] = set() + for cluster in clusters: + for node in cluster: + assert node not in seen, f"node {node} appears in multiple clusters" + seen.add(node) + assert seen == expected_nodes, f"missing nodes: {expected_nodes - seen}" + + +def test_empty_projection_returns_empty(): + assert label_propagation({}) == [] + + +def test_single_isolated_node(): + projection = {"a": []} + clusters = label_propagation(projection) + _assert_partition(clusters, {"a"}) + assert len(clusters) == 1 + + +def test_two_disconnected_triangles(): + projection = _make_projection( + [ + ("a1", "a2", 1), + ("a2", "a3", 1), + ("a3", "a1", 1), + ("b1", "b2", 1), + ("b2", "b3", 1), + ("b3", "b1", 1), + ] + ) + clusters = label_propagation(projection) + _assert_partition(clusters, {"a1", "a2", "a3", "b1", "b2", "b3"}) + assert len(clusters) == 2 + +def test_complete_graph_collapses_to_one_community(): + edges = [(f"n{i}", f"n{j}", 1) for i in range(8) for j in range(i + 1, 8)] + projection = _make_projection(edges) + clusters = label_propagation(projection) + assert len(clusters) == 1 + assert len(clusters[0]) == 8 -def test_falls_back_to_summary_length_with_empty_projection(): - """An empty projection (e.g., from a graph_operations_interface that - does not expose projections) is treated like no projection at all.""" - members = [ - _make_entity('a', summary='short'), - _make_entity('b', summary='x' * 100), + +def test_hub_with_leaves_converges(): + """Regression: central hub with many leaves used to oscillate. + + The synchronous batch implementation flipped leaves between the hub's + community and their own community every iteration, never converging. + """ + edges = [(f"leaf{i}", "hub", 1) for i in range(20)] + projection = _make_projection(edges) + start = time.time() + clusters = label_propagation(projection) + elapsed = time.time() - start + _assert_partition(clusters, {"hub", *(f"leaf{i}" for i in range(20))}) + assert elapsed < 1.0, f"hub graph should converge quickly; took {elapsed:.2f}s" + + +def test_two_stars_joined_by_bridge(): + """Two hub+leaves clusters connected by one bridge edge. + + A correct community detector should identify two communities (one per + star). Earlier synchronous implementations could oscillate here. + """ + edges = [ + *[(f"a_leaf{i}", "hub_a", 1) for i in range(10)], + *[(f"b_leaf{i}", "hub_b", 1) for i in range(10)], + ("hub_a", "hub_b", 1), ] - sampled = _select_representative_members(members, projection={}, sample_size=1) - assert sampled[0].uuid == 'b' - - -def test_deterministic_on_ties(): - """Same input produces the same partition across runs.""" - members = [_make_entity(f'e{i}') for i in range(5)] - projection: dict[str, list[Neighbor]] = { - 'e0': [Neighbor(node_uuid='e1', edge_count=1)], - 'e1': [ - Neighbor(node_uuid='e0', edge_count=1), - Neighbor(node_uuid='e2', edge_count=1), - ], - 'e2': [ - Neighbor(node_uuid='e1', edge_count=1), - Neighbor(node_uuid='e3', edge_count=1), - ], - 'e3': [ - Neighbor(node_uuid='e2', edge_count=1), - Neighbor(node_uuid='e4', edge_count=1), - ], - 'e4': [Neighbor(node_uuid='e3', edge_count=1)], - } - first = _select_representative_members(members, projection, sample_size=2) - second = _select_representative_members(members, projection, sample_size=2) - assert [m.uuid for m in first] == [m.uuid for m in second] - - -def test_only_counts_in_community_edges(): - """Edges to entities outside the community must be ignored. - - A node with many out-of-community connections but only a few in-community - edges should not outrank an in-community-focused node. + projection = _make_projection(edges) + clusters = label_propagation(projection) + _assert_partition( + clusters, + {"hub_a", "hub_b", *(f"a_leaf{i}" for i in range(10)), *(f"b_leaf{i}" for i in range(10))}, + ) + assert len(clusters) == 2 + + +def test_real_world_pathological_graph_converges(): + """Regression test from an observed production failure. + + A 48-node knowledge graph with a central "Threshold" node + (uuid `d689c03c`) connected to 14+ entities caused the synchronous + batch implementation to oscillate indefinitely — a fixed subset of + 19 nodes kept flipping between two states forever. + + This projection is a simplified version of the failing graph. With + the synchronous implementation it never returned; the async form + converges in milliseconds. """ - members = [_make_entity('insider'), _make_entity('insider2')] - projection: dict[str, list[Neighbor]] = { - 'insider': [ - # Many heavy edges to entities NOT in the cluster - Neighbor(node_uuid='outsider_a', edge_count=100), - Neighbor(node_uuid='outsider_b', edge_count=100), - # One light edge inside - Neighbor(node_uuid='insider2', edge_count=1), - ], - 'insider2': [ - Neighbor(node_uuid='insider', edge_count=1), - ], - } - sampled = _select_representative_members(members, projection, sample_size=1) - # Both have in-community degree 1; tie-broken by name desc → 'insider2' wins - assert sampled[0].uuid == 'insider2' - - -def test_summary_length_breaks_degree_ties(): - """When two nodes have the same in-community degree, the one with the - richer summary wins (since richer summaries contribute more to the - binary merge).""" - members = [ - _make_entity('a', summary='x' * 10), - _make_entity('b', summary='x' * 200), + # Hub node with heavy ties to several satellites + hub = "hub" + sat_heavy = [f"sat_h{i}" for i in range(4)] # strong connections to hub + sat_light = [f"sat_l{i}" for i in range(10)] # weak connections to hub + + edges: list[tuple[str, str, int]] = [] + # Strong ties: hub ↔ each heavy satellite (edge count 29) + edges.extend((hub, sat, 29) for sat in sat_heavy) + # Weak ties: hub ↔ each light satellite (edge count 1) + edges.extend((hub, sat, 1) for sat in sat_light) + # Triangle-ish ties among light satellites to create tie ambiguity + for i in range(0, len(sat_light) - 1, 2): + edges.append((sat_light[i], sat_light[i + 1], 1)) + # A few floating dyads that should form their own mini-communities + edges.append(("pair_a1", "pair_a2", 1)) + edges.append(("pair_b1", "pair_b2", 1)) + + projection = _make_projection(edges) + + start = time.time() + clusters = label_propagation(projection) + elapsed = time.time() - start + + all_nodes = {hub, *sat_heavy, *sat_light, "pair_a1", "pair_a2", "pair_b1", "pair_b2"} + _assert_partition(clusters, all_nodes) + assert elapsed < 1.0, f"pathological graph should converge fast; took {elapsed:.2f}s" + # Sanity: at least one community should contain the hub and its heavy ties + hub_cluster = next(c for c in clusters if hub in c) + for sat in sat_heavy: + assert sat in hub_cluster, f"{sat} should be in hub's community" + + +def test_deterministic_under_seed(): + """Same input produces the same partition across runs. + + The async form shuffles node order, but uses a fixed RNG seed so + results are reproducible. + """ + edges = [ + ("a", "b", 1), + ("b", "c", 1), + ("c", "a", 1), + ("d", "e", 1), + ("e", "f", 1), + ("f", "d", 1), + ("a", "d", 1), ] - projection: dict[str, list[Neighbor]] = { - 'a': [Neighbor(node_uuid='b', edge_count=1)], - 'b': [Neighbor(node_uuid='a', edge_count=1)], - } - sampled = _select_representative_members(members, projection, sample_size=1) - assert sampled[0].uuid == 'b' + projection = _make_projection(edges) + + first = label_propagation(projection) + second = label_propagation(projection) + + # Canonicalize (sort within cluster, sort list of clusters) + def canon(cs: list[list[str]]) -> list[list[str]]: + return sorted([sorted(c) for c in cs]) + + assert canon(first) == canon(second) + + +@pytest.mark.parametrize("n", [50, 200]) +def test_ring_graph_of_varying_sizes(n: int): + """Rings are edge cases for label propagation.""" + edges = [(f"r{i}", f"r{(i + 1) % n}", 1) for i in range(n)] + projection = _make_projection(edges) + start = time.time() + clusters = label_propagation(projection) + elapsed = time.time() - start + _assert_partition(clusters, {f"r{i}" for i in range(n)}) + assert elapsed < 2.0, f"ring of {n} should converge fast; took {elapsed:.2f}s"