Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 77 additions & 19 deletions graphiti_core/utils/maintenance/community_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,45 +129,103 @@ async def get_community_clusters(
return community_clusters


LABEL_PROPAGATION_OSCILLATION_WINDOW = 8
_LABEL_PROPAGATION_RNG_SEED = 42


def label_propagation(projection: dict[str, list[Neighbor]]) -> list[list[str]]:
# Implement the label propagation community detection algorithm.
# 1. Start with each node being assigned its own community
# 2. Each node will take on the community of the plurality of its neighbors
# 3. Ties are broken by going to the largest community
# 4. Continue until no communities change during propagation
# Asynchronous label propagation with shuffled node order and oscillation
# detection. This is the form described by Raghavan et al. (2007),
# "Near linear time algorithm to detect community structures in
# large-scale networks".
#
# Algorithm:
# 1. Each node starts in its own community.
# 2. In each pass, visit nodes in a FRESH random order.
# 3. For each node, move it to the plurality-weight community among its
# neighbors, using the CURRENT (in-place) community assignments.
# Reading the live state (not a snapshot) is the key correctness fix
# over the naive synchronous form — once a node flips, its neighbors
# see the new label immediately, breaking ping-pong loops.
# 4. Break ties deterministically by preferring the higher community id,
# and only move if the candidate strictly improves on the current
# support (so well-connected nodes stay put under ties).
# 5. Terminate on natural convergence (no node changed in a full pass).
# As a belt-and-suspenders safeguard, also break if the full state
# repeats within a short recent window — async LPA is known to
# converge on undirected graphs, but a cycle detector catches any
# edge case we have not anticipated.
#
# Rationale: the synchronous form (batch update from a frozen snapshot)
# is vulnerable to flip-flop oscillation on graphs with high-degree hub
# nodes. Tied candidate scores cause groups of nodes to swap labels
# symmetrically every iteration, which repeats forever. Async updates
# eliminate that class of failure and empirically converge in O(log n)
# iterations on real-world graphs.

import random
from collections import deque

community_map = {uuid: i for i, uuid in enumerate(projection.keys())}
node_order = list(projection.keys())

rng = random.Random(_LABEL_PROPAGATION_RNG_SEED)
recent_state_hashes: deque[int] = deque(maxlen=LABEL_PROPAGATION_OSCILLATION_WINDOW)

while True:
rng.shuffle(node_order)
no_change = True
new_community_map: dict[str, int] = {}

for uuid, neighbors in projection.items():
for uuid in node_order:
neighbors = projection[uuid]
if not neighbors:
continue

curr_community = community_map[uuid]

community_candidates: dict[int, int] = defaultdict(int)
for neighbor in neighbors:
# In-place read — picks up changes from earlier in this pass.
community_candidates[community_map[neighbor.node_uuid]] += neighbor.edge_count
community_lst = [
(count, community) for community, count in community_candidates.items()
]

community_lst.sort(reverse=True)
candidate_rank, community_candidate = community_lst[0] if community_lst else (0, -1)
if community_candidate != -1 and candidate_rank > 1:
new_community = community_candidate
else:
new_community = max(community_candidate, curr_community)

new_community_map[uuid] = new_community
if not community_candidates:
continue

# Pick (count desc, community_id desc) — determinism on ties.
best_community, best_count = max(
community_candidates.items(),
key=lambda item: (item[1], item[0]),
)
curr_support = community_candidates.get(curr_community, 0)

# Only move on strict improvement, or on tie with a deterministic
# preference for the higher community id. This prevents a node
# from churning between equally-supported communities forever.
if best_count > curr_support:
new_community = best_community
elif best_count == curr_support and best_community > curr_community:
new_community = best_community
else:
new_community = curr_community

if new_community != curr_community:
community_map[uuid] = new_community
no_change = False

if no_change:
break

community_map = new_community_map
# Belt-and-suspenders: if the exact same community_map repeats
# within a short window, we are in a stable cycle — stop and keep
# whatever partition we have. Async LPA should not reach this path
# on real graphs; if it does, something is structurally unusual.
state_hash = hash(frozenset(community_map.items()))
if state_hash in recent_state_hashes:
logger.warning(
'label_propagation detected oscillation — using current clustering'
)
break
recent_state_hashes.append(state_hash)

community_cluster_map = defaultdict(list)
for uuid, community in community_map.items():
Expand Down
Loading
Loading