diff --git a/changelog.d/19609.feature b/changelog.d/19609.feature new file mode 100644 index 00000000000..845f4ebbe38 --- /dev/null +++ b/changelog.d/19609.feature @@ -0,0 +1 @@ +Prioritise recently active rooms when updating profile information. diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index d123bcdd367..59f7d7cdce8 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -724,23 +724,113 @@ async def _update_join_states_task( assert task.params target_user = UserID.from_string(task.resource_id) - room_ids = sorted(await self.store.get_rooms_for_user(target_user.to_string())) + all_room_ids = await self.store.get_rooms_for_user(target_user.to_string()) + + # Get the full alphabetically sorted list for stable ordering + sorted_room_ids = sorted(all_room_ids) last_room_id = task.result.get("last_room_id", None) if task.result else None + processed_priority_rooms = set( + task.result.get("processed_priority_rooms", []) if task.result else [] + ) + # Get the stored priority room list if we're resuming + stored_priority_room_ids = ( + task.result.get("priority_room_ids", []) if task.result else [] + ) + + NUMBER_OF_PRIORITISED_ROOMS = 50 + priority_room_ids: list[str] = [] + + # Determine which phase we're in and what to do + if last_room_id is None: + # We're in priority phase (or starting fresh) + if stored_priority_room_ids: + # Resuming priority phase - use stored list + priority_room_ids = stored_priority_room_ids + elif not processed_priority_rooms: + # Fresh start - get the most recently active rooms + try: + # Get the last event position for each room + room_activity = await self.store.bulk_get_last_event_pos_in_room_before_stream_ordering( + all_room_ids, + self.store.get_room_max_token(), # Use current positions + ) - if last_room_id: + # Sort rooms by activity (descending stream ordering) + # Use room_id as tiebreaker for deterministic ordering + # Higher activity first, then alphabetically by room_id for ties + priority_room_ids = sorted( + room_activity.keys(), + key=lambda rid: (-room_activity.get(rid, 0), rid), + )[:NUMBER_OF_PRIORITISED_ROOMS] + + # Store the priority list so we can resume if interrupted + stored_priority_room_ids = priority_room_ids + + except Exception as e: + # If we can't get priority rooms, fall back to alphabetical + logger.warning( + "Failed to get priority rooms for %s: %s. Falling back to alphabetical order.", + target_user.to_string(), + str(e), + ) + priority_room_ids = [] + stored_priority_room_ids = [] + else: + # We're in alphabetical phase (last_room_id is set), continue from there # Filter out room IDs that have already been handled - # by finding the first room ID greater than the last handled room ID - # and slicing the list from that point onwards. - room_ids = room_ids[bisect_right(room_ids, last_room_id) :] + sorted_room_ids = sorted_room_ids[ + bisect_right(sorted_room_ids, last_room_id) : + ] requester = create_requester( user_id=target_user, authenticated_entity=task.params.get("requester_authenticated_entity"), ) - for room_id in room_ids: - handler = self.hs.get_room_member_handler() + handler = self.hs.get_room_member_handler() + + # Process priority rooms first (if any) + for room_id in priority_room_ids: + if room_id in processed_priority_rooms: + continue # Skip if already processed in a previous run + + try: + # Assume the target_user isn't a guest, + # because we don't let guests set profile or avatar data. + await handler.update_membership( + requester, + target_user, + room_id, + "join", # We treat a profile update like a join. + ratelimit=False, # Try to hide that these events aren't atomic. + ) + except CancelledError as e: + raise e + except Exception as e: + logger.warning( + "Failed to update join event for priority room %s - %s", + room_id, + str(e), + ) + + processed_priority_rooms.add(room_id) + # Don't update last_room_id for priority rooms, keep it None + # so we know we're still in priority phase if interrupted + await self._task_scheduler.update_task( + task.id, + result={ + "last_room_id": None, + "processed_priority_rooms": list(processed_priority_rooms), + "priority_room_ids": stored_priority_room_ids, + }, + ) + + # Now process all rooms in alphabetical order (skipping already-processed priority rooms) + for room_id in sorted_room_ids: + if room_id in processed_priority_rooms: + continue # Skip rooms already processed in priority phase + try: # Assume the target_user isn't a guest, # because we don't let guests set profile or avatar data. @@ -758,7 +848,12 @@ async def _update_join_states_task( "Failed to update join event for room %s - %s", room_id, str(e) ) await self._task_scheduler.update_task( - task.id, result={"last_room_id": room_id} + task.id, + result={ + "last_room_id": room_id, + "processed_priority_rooms": list(processed_priority_rooms), + "priority_room_ids": stored_priority_room_ids, + }, ) return TaskStatus.COMPLETE, None, None diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py index 2e521a86b60..ca09f66e815 100644 --- a/tests/handlers/test_profile.py +++ b/tests/handlers/test_profile.py @@ -314,6 +314,108 @@ async def potentially_slow_update_membership( membership[state_tuple].content["displayname"], "Frank Jr." ) + def test_background_update_prioritises_recently_active_rooms(self) -> None: + """Test that profile updates process recently active rooms first. + + This test will fail with the current implementation (alphabetical order) + and pass with the new implementation (activity-based priority). + """ + + # Set an initial displayname + self.get_success( + self.handler.set_displayname( + self.frank, synapse.types.create_requester(self.frank), "Frank" + ) + ) + + # Create multiple rooms + room_ids = [] + for _ in range(5): + room_id = self.helper.create_room_as( + self.frank.to_string(), tok=self.frank_token + ) + room_ids.append(room_id) + + # Sort room IDs alphabetically to understand the expected order + room_ids.sort() + + # The last room alphabetically should be processed first if it's most active + last_room_alphabetically = room_ids[-1] + first_room_alphabetically = room_ids[0] + + # Mock the bulk_get_last_event_pos_in_room_before_stream_ordering to make + # the last alphabetical room the most recently active + async def mock_bulk_get_last_event( + room_ids_param: list, end_stream_order_by_room: dict + ) -> dict: + # Return fake stream orderings that make the last alphabetical room most active + result = {} + for idx, room_id in enumerate(room_ids): + if room_id == last_room_alphabetically: + # Give the last alphabetical room the highest stream ordering (most recent) + result[room_id] = 1000 + elif room_id == first_room_alphabetically: + # Give the first alphabetical room a low stream ordering (least recent) + result[room_id] = 100 + else: + # Give other rooms intermediate stream orderings + result[room_id] = 500 - idx * 10 + return result + + # Track which rooms get updated and in what order + updated_rooms_order = [] + original_update_membership = self.hs.get_room_member_handler().update_membership + + async def track_update_membership(*args: Any, **kwargs: Any) -> tuple[str, int]: + room_id = args[2] # room_id is the third argument + if room_id not in updated_rooms_order: + updated_rooms_order.append(room_id) + return await original_update_membership(*args, **kwargs) + + with ( + patch.object( + self.store, + "bulk_get_last_event_pos_in_room_before_stream_ordering", + side_effect=mock_bulk_get_last_event, + ), + patch.object( + self.hs.get_room_member_handler(), + "update_membership", + side_effect=track_update_membership, + ), + ): + # Trigger the profile update + self.get_success( + self.handler.set_displayname( + self.frank, synapse.types.create_requester(self.frank), "Frank Jr." + ) + ) + + # Let the background task run + self.pump(1.0) + + # Wait for task to complete + tasks = self.get_success( + self.task_scheduler.get_tasks( + actions=["update_join_states"], statuses=[TaskStatus.COMPLETE] + ) + ) + self.assertGreaterEqual( + len(tasks), 1, "Profile update task should complete" + ) + + # Verify that all rooms were eventually updated + self.assertEqual( + set(updated_rooms_order), set(room_ids), "All rooms should be updated" + ) + + self.assertIn( + last_room_alphabetically, + updated_rooms_order[:1], + f"Most active room {last_room_alphabetically} should be processed first, " + f"but rooms were processed in order: {updated_rooms_order}", + ) + def test_set_my_name_if_disabled(self) -> None: self.hs.config.registration.enable_set_displayname = False