From 48393390c46f88700dd323c619ae003527b0c320 Mon Sep 17 00:00:00 2001 From: Neil Johnson Date: Thu, 26 Mar 2026 12:19:27 +0000 Subject: [PATCH 1/5] Prioritise recently active rooms when updating profile information --- synapse/handlers/profile.py | 89 ++++++++++++++++++++++++++-- tests/handlers/test_profile.py | 102 +++++++++++++++++++++++++++++++++ 2 files changed, 185 insertions(+), 6 deletions(-) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index d123bcdd367..f610816152a 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -724,23 +724,96 @@ async def _update_join_states_task( assert task.params target_user = UserID.from_string(task.resource_id) - room_ids = sorted(await self.store.get_rooms_for_user(target_user.to_string())) + all_room_ids = await self.store.get_rooms_for_user(target_user.to_string()) + + # Get the full alphabetically sorted list for stable ordering + sorted_room_ids = sorted(all_room_ids) last_room_id = task.result.get("last_room_id", None) if task.result else None + processed_priority_rooms = set( + task.result.get("processed_priority_rooms", []) if task.result else [] + ) + + NUMBER_OF_PRIORITISED_ROOMS = 50 + # If we haven't processed priority rooms yet (no last_room_id means fresh start) + if not last_room_id: + # Get the most recently active rooms for this user + try: + # Get the last event position for each room + room_activity = await self.store.bulk_get_last_event_pos_in_room_before_stream_ordering( + all_room_ids, + self.store.get_room_max_token(), # Use current positions + ) - if last_room_id: + # Sort rooms by activity (descending stream ordering) + priority_room_ids = sorted( + room_activity.keys(), + key=lambda rid: room_activity.get(rid, 0), + reverse=True, + )[:NUMBER_OF_PRIORITISED_ROOMS] + + except Exception as e: + # If we can't get priority rooms, fall back to alphabetical + logger.warning( + "Failed to get priority rooms for %s: %s. Falling back to alphabetical order.", + target_user.to_string(), + str(e), + ) + priority_room_ids = [] + else: + # We're resuming, skip priority processing + priority_room_ids = [] # Filter out room IDs that have already been handled # by finding the first room ID greater than the last handled room ID # and slicing the list from that point onwards. - room_ids = room_ids[bisect_right(room_ids, last_room_id) :] + sorted_room_ids = sorted_room_ids[ + bisect_right(sorted_room_ids, last_room_id) : + ] requester = create_requester( user_id=target_user, authenticated_entity=task.params.get("requester_authenticated_entity"), ) - for room_id in room_ids: - handler = self.hs.get_room_member_handler() + handler = self.hs.get_room_member_handler() + + # Process priority rooms first (if any) + for room_id in priority_room_ids: + if room_id in processed_priority_rooms: + continue # Skip if already processed in a previous run + + try: + # Assume the target_user isn't a guest, + # because we don't let guests set profile or avatar data. + await handler.update_membership( + requester, + target_user, + room_id, + "join", # We treat a profile update like a join. + ratelimit=False, # Try to hide that these events aren't atomic. + ) + except CancelledError as e: + raise e + except Exception as e: + logger.warning( + "Failed to update join event for priority room %s - %s", + room_id, + str(e), + ) + + processed_priority_rooms.add(room_id) + # Don't update last_room_id for priority rooms, keep it None + # so we know we're still in priority phase if interrupted + await self._task_scheduler.update_task( + task.id, + result={ + "last_room_id": None, + "processed_priority_rooms": list(processed_priority_rooms), + }, + ) + + # Now process all rooms in alphabetical order (including re-processing priority rooms) + for room_id in sorted_room_ids: try: # Assume the target_user isn't a guest, # because we don't let guests set profile or avatar data. @@ -758,7 +831,11 @@ async def _update_join_states_task( "Failed to update join event for room %s - %s", room_id, str(e) ) await self._task_scheduler.update_task( - task.id, result={"last_room_id": room_id} + task.id, + result={ + "last_room_id": room_id, + "processed_priority_rooms": list(processed_priority_rooms), + }, ) return TaskStatus.COMPLETE, None, None diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py index 2e521a86b60..ca09f66e815 100644 --- a/tests/handlers/test_profile.py +++ b/tests/handlers/test_profile.py @@ -314,6 +314,108 @@ async def potentially_slow_update_membership( membership[state_tuple].content["displayname"], "Frank Jr." ) + def test_background_update_prioritises_recently_active_rooms(self) -> None: + """Test that profile updates process recently active rooms first. + + This test will fail with the current implementation (alphabetical order) + and pass with the new implementation (activity-based priority). + """ + + # Set an initial displayname + self.get_success( + self.handler.set_displayname( + self.frank, synapse.types.create_requester(self.frank), "Frank" + ) + ) + + # Create multiple rooms + room_ids = [] + for _ in range(5): + room_id = self.helper.create_room_as( + self.frank.to_string(), tok=self.frank_token + ) + room_ids.append(room_id) + + # Sort room IDs alphabetically to understand the expected order + room_ids.sort() + + # The last room alphabetically should be processed first if it's most active + last_room_alphabetically = room_ids[-1] + first_room_alphabetically = room_ids[0] + + # Mock the bulk_get_last_event_pos_in_room_before_stream_ordering to make + # the last alphabetical room the most recently active + async def mock_bulk_get_last_event( + room_ids_param: list, end_stream_order_by_room: dict + ) -> dict: + # Return fake stream orderings that make the last alphabetical room most active + result = {} + for idx, room_id in enumerate(room_ids): + if room_id == last_room_alphabetically: + # Give the last alphabetical room the highest stream ordering (most recent) + result[room_id] = 1000 + elif room_id == first_room_alphabetically: + # Give the first alphabetical room a low stream ordering (least recent) + result[room_id] = 100 + else: + # Give other rooms intermediate stream orderings + result[room_id] = 500 - idx * 10 + return result + + # Track which rooms get updated and in what order + updated_rooms_order = [] + original_update_membership = self.hs.get_room_member_handler().update_membership + + async def track_update_membership(*args: Any, **kwargs: Any) -> tuple[str, int]: + room_id = args[2] # room_id is the third argument + if room_id not in updated_rooms_order: + updated_rooms_order.append(room_id) + return await original_update_membership(*args, **kwargs) + + with ( + patch.object( + self.store, + "bulk_get_last_event_pos_in_room_before_stream_ordering", + side_effect=mock_bulk_get_last_event, + ), + patch.object( + self.hs.get_room_member_handler(), + "update_membership", + side_effect=track_update_membership, + ), + ): + # Trigger the profile update + self.get_success( + self.handler.set_displayname( + self.frank, synapse.types.create_requester(self.frank), "Frank Jr." + ) + ) + + # Let the background task run + self.pump(1.0) + + # Wait for task to complete + tasks = self.get_success( + self.task_scheduler.get_tasks( + actions=["update_join_states"], statuses=[TaskStatus.COMPLETE] + ) + ) + self.assertGreaterEqual( + len(tasks), 1, "Profile update task should complete" + ) + + # Verify that all rooms were eventually updated + self.assertEqual( + set(updated_rooms_order), set(room_ids), "All rooms should be updated" + ) + + self.assertIn( + last_room_alphabetically, + updated_rooms_order[:1], + f"Most active room {last_room_alphabetically} should be processed first, " + f"but rooms were processed in order: {updated_rooms_order}", + ) + def test_set_my_name_if_disabled(self) -> None: self.hs.config.registration.enable_set_displayname = False From 2c09bce0026262f0df8f4a03267e3658845003cc Mon Sep 17 00:00:00 2001 From: Neil Johnson Date: Thu, 26 Mar 2026 12:22:29 +0000 Subject: [PATCH 2/5] changelog --- changelog.d/19609.feature | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/19609.feature diff --git a/changelog.d/19609.feature b/changelog.d/19609.feature new file mode 100644 index 00000000000..845f4ebbe38 --- /dev/null +++ b/changelog.d/19609.feature @@ -0,0 +1 @@ +Prioritise recently active rooms when updating profile information. From 6245988aaeec49382e229c66cee5a83fc70f0d74 Mon Sep 17 00:00:00 2001 From: Neil Johnson Date: Thu, 26 Mar 2026 13:15:44 +0000 Subject: [PATCH 3/5] add priority_room_ids to task state, do not refetch on resume. This fixes a race condition where the restart happens before any prioritised rooms canbe processed --- synapse/handlers/profile.py | 75 +++++++++++++++++++++++-------------- 1 file changed, 46 insertions(+), 29 deletions(-) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index f610816152a..c8283a1a15a 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -733,39 +733,51 @@ async def _update_join_states_task( processed_priority_rooms = set( task.result.get("processed_priority_rooms", []) if task.result else [] ) + # Get the stored priority room list if we're resuming + stored_priority_room_ids = ( + task.result.get("priority_room_ids", []) if task.result else [] + ) NUMBER_OF_PRIORITISED_ROOMS = 50 - # If we haven't processed priority rooms yet (no last_room_id means fresh start) - if not last_room_id: - # Get the most recently active rooms for this user - try: - # Get the last event position for each room - room_activity = await self.store.bulk_get_last_event_pos_in_room_before_stream_ordering( - all_room_ids, - self.store.get_room_max_token(), # Use current positions - ) - - # Sort rooms by activity (descending stream ordering) - priority_room_ids = sorted( - room_activity.keys(), - key=lambda rid: room_activity.get(rid, 0), - reverse=True, - )[:NUMBER_OF_PRIORITISED_ROOMS] + priority_room_ids: list[str] = [] + + # Determine which phase we're in and what to do + if last_room_id is None: + # We're in priority phase (or starting fresh) + if stored_priority_room_ids: + # Resuming priority phase - use stored list + priority_room_ids = stored_priority_room_ids + elif not processed_priority_rooms: + # Fresh start - get the most recently active rooms + try: + # Get the last event position for each room + room_activity = await self.store.bulk_get_last_event_pos_in_room_before_stream_ordering( + all_room_ids, + self.store.get_room_max_token(), # Use current positions + ) - except Exception as e: - # If we can't get priority rooms, fall back to alphabetical - logger.warning( - "Failed to get priority rooms for %s: %s. Falling back to alphabetical order.", - target_user.to_string(), - str(e), - ) - priority_room_ids = [] + # Sort rooms by activity (descending stream ordering) + priority_room_ids = sorted( + room_activity.keys(), + key=lambda rid: room_activity.get(rid, 0), + reverse=True, + )[:NUMBER_OF_PRIORITISED_ROOMS] + + # Store the priority list so we can resume if interrupted + stored_priority_room_ids = priority_room_ids + + except Exception as e: + # If we can't get priority rooms, fall back to alphabetical + logger.warning( + "Failed to get priority rooms for %s: %s. Falling back to alphabetical order.", + target_user.to_string(), + str(e), + ) + priority_room_ids = [] + stored_priority_room_ids = [] else: - # We're resuming, skip priority processing - priority_room_ids = [] + # We're in alphabetical phase (last_room_id is set), continue from there # Filter out room IDs that have already been handled - # by finding the first room ID greater than the last handled room ID - # and slicing the list from that point onwards. sorted_room_ids = sorted_room_ids[ bisect_right(sorted_room_ids, last_room_id) : ] @@ -809,11 +821,15 @@ async def _update_join_states_task( result={ "last_room_id": None, "processed_priority_rooms": list(processed_priority_rooms), + "priority_room_ids": stored_priority_room_ids, }, ) - # Now process all rooms in alphabetical order (including re-processing priority rooms) + # Now process all rooms in alphabetical order (skipping already-processed priority rooms) for room_id in sorted_room_ids: + if room_id in processed_priority_rooms: + continue # Skip rooms already processed in priority phase + try: # Assume the target_user isn't a guest, # because we don't let guests set profile or avatar data. @@ -835,6 +851,7 @@ async def _update_join_states_task( result={ "last_room_id": room_id, "processed_priority_rooms": list(processed_priority_rooms), + "priority_room_ids": stored_priority_room_ids, }, ) From 596ffb81c27c8f700988fecf4c24a9c6a6fb8d12 Mon Sep 17 00:00:00 2001 From: Neil Johnson Date: Thu, 26 Mar 2026 13:18:22 +0000 Subject: [PATCH 4/5] lint --- synapse/handlers/profile.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index c8283a1a15a..1ef71557e3c 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -740,7 +740,7 @@ async def _update_join_states_task( NUMBER_OF_PRIORITISED_ROOMS = 50 priority_room_ids: list[str] = [] - + # Determine which phase we're in and what to do if last_room_id is None: # We're in priority phase (or starting fresh) @@ -829,7 +829,7 @@ async def _update_join_states_task( for room_id in sorted_room_ids: if room_id in processed_priority_rooms: continue # Skip rooms already processed in priority phase - + try: # Assume the target_user isn't a guest, # because we don't let guests set profile or avatar data. From a27757bfb260bbdc1c73eba95e4479f83809245d Mon Sep 17 00:00:00 2001 From: Neil Johnson Date: Thu, 26 Mar 2026 13:37:55 +0000 Subject: [PATCH 5/5] tests require deterministic ordering of priority rooms --- synapse/handlers/profile.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 1ef71557e3c..59f7d7cdce8 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -757,10 +757,11 @@ async def _update_join_states_task( ) # Sort rooms by activity (descending stream ordering) + # Use room_id as tiebreaker for deterministic ordering + # Higher activity first, then alphabetically by room_id for ties priority_room_ids = sorted( room_activity.keys(), - key=lambda rid: room_activity.get(rid, 0), - reverse=True, + key=lambda rid: (-room_activity.get(rid, 0), rid), )[:NUMBER_OF_PRIORITISED_ROOMS] # Store the priority list so we can resume if interrupted