Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/19609.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Prioritise recently active rooms when updating profile information.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Based on the previous PR #17074 for reference

111 changes: 103 additions & 8 deletions synapse/handlers/profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -724,23 +724,113 @@ async def _update_join_states_task(
assert task.params

target_user = UserID.from_string(task.resource_id)
room_ids = sorted(await self.store.get_rooms_for_user(target_user.to_string()))
all_room_ids = await self.store.get_rooms_for_user(target_user.to_string())

# Get the full alphabetically sorted list for stable ordering
sorted_room_ids = sorted(all_room_ids)

last_room_id = task.result.get("last_room_id", None) if task.result else None
processed_priority_rooms = set(
task.result.get("processed_priority_rooms", []) if task.result else []
)
# Get the stored priority room list if we're resuming
stored_priority_room_ids = (
task.result.get("priority_room_ids", []) if task.result else []
)

NUMBER_OF_PRIORITISED_ROOMS = 50
priority_room_ids: list[str] = []

# Determine which phase we're in and what to do
if last_room_id is None:
# We're in priority phase (or starting fresh)
if stored_priority_room_ids:
# Resuming priority phase - use stored list
priority_room_ids = stored_priority_room_ids
elif not processed_priority_rooms:
# Fresh start - get the most recently active rooms
try:
# Get the last event position for each room
room_activity = await self.store.bulk_get_last_event_pos_in_room_before_stream_ordering(
all_room_ids,
self.store.get_room_max_token(), # Use current positions
)

if last_room_id:
# Sort rooms by activity (descending stream ordering)
# Use room_id as tiebreaker for deterministic ordering
# Higher activity first, then alphabetically by room_id for ties
priority_room_ids = sorted(
room_activity.keys(),
key=lambda rid: (-room_activity.get(rid, 0), rid),
)[:NUMBER_OF_PRIORITISED_ROOMS]

# Store the priority list so we can resume if interrupted
stored_priority_room_ids = priority_room_ids

except Exception as e:
# If we can't get priority rooms, fall back to alphabetical
logger.warning(
"Failed to get priority rooms for %s: %s. Falling back to alphabetical order.",
target_user.to_string(),
str(e),
)
priority_room_ids = []
stored_priority_room_ids = []
else:
# We're in alphabetical phase (last_room_id is set), continue from there
# Filter out room IDs that have already been handled
# by finding the first room ID greater than the last handled room ID
# and slicing the list from that point onwards.
room_ids = room_ids[bisect_right(room_ids, last_room_id) :]
sorted_room_ids = sorted_room_ids[
bisect_right(sorted_room_ids, last_room_id) :
]

requester = create_requester(
user_id=target_user,
authenticated_entity=task.params.get("requester_authenticated_entity"),
)

for room_id in room_ids:
handler = self.hs.get_room_member_handler()
handler = self.hs.get_room_member_handler()

# Process priority rooms first (if any)
for room_id in priority_room_ids:
if room_id in processed_priority_rooms:
continue # Skip if already processed in a previous run

try:
# Assume the target_user isn't a guest,
# because we don't let guests set profile or avatar data.
await handler.update_membership(
requester,
target_user,
room_id,
"join", # We treat a profile update like a join.
ratelimit=False, # Try to hide that these events aren't atomic.
)
except CancelledError as e:
raise e
except Exception as e:
logger.warning(
"Failed to update join event for priority room %s - %s",
room_id,
str(e),
)

processed_priority_rooms.add(room_id)
# Don't update last_room_id for priority rooms, keep it None
# so we know we're still in priority phase if interrupted
await self._task_scheduler.update_task(
task.id,
result={
"last_room_id": None,
"processed_priority_rooms": list(processed_priority_rooms),
"priority_room_ids": stored_priority_room_ids,
},
)

# Now process all rooms in alphabetical order (skipping already-processed priority rooms)
for room_id in sorted_room_ids:
if room_id in processed_priority_rooms:
continue # Skip rooms already processed in priority phase

try:
# Assume the target_user isn't a guest,
# because we don't let guests set profile or avatar data.
Expand All @@ -758,7 +848,12 @@ async def _update_join_states_task(
"Failed to update join event for room %s - %s", room_id, str(e)
)
await self._task_scheduler.update_task(
task.id, result={"last_room_id": room_id}
task.id,
result={
"last_room_id": room_id,
"processed_priority_rooms": list(processed_priority_rooms),
"priority_room_ids": stored_priority_room_ids,
},
)

return TaskStatus.COMPLETE, None, None
Expand Down
102 changes: 102 additions & 0 deletions tests/handlers/test_profile.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,108 @@ async def potentially_slow_update_membership(
membership[state_tuple].content["displayname"], "Frank Jr."
)

def test_background_update_prioritises_recently_active_rooms(self) -> None:
"""Test that profile updates process recently active rooms first.

This test will fail with the current implementation (alphabetical order)
and pass with the new implementation (activity-based priority).
"""

# Set an initial displayname
self.get_success(
self.handler.set_displayname(
self.frank, synapse.types.create_requester(self.frank), "Frank"
)
)

# Create multiple rooms
room_ids = []
for _ in range(5):
room_id = self.helper.create_room_as(
self.frank.to_string(), tok=self.frank_token
)
room_ids.append(room_id)

# Sort room IDs alphabetically to understand the expected order
room_ids.sort()

# The last room alphabetically should be processed first if it's most active
last_room_alphabetically = room_ids[-1]
first_room_alphabetically = room_ids[0]

# Mock the bulk_get_last_event_pos_in_room_before_stream_ordering to make
# the last alphabetical room the most recently active
async def mock_bulk_get_last_event(
room_ids_param: list, end_stream_order_by_room: dict
) -> dict:
# Return fake stream orderings that make the last alphabetical room most active
result = {}
for idx, room_id in enumerate(room_ids):
if room_id == last_room_alphabetically:
# Give the last alphabetical room the highest stream ordering (most recent)
result[room_id] = 1000
elif room_id == first_room_alphabetically:
# Give the first alphabetical room a low stream ordering (least recent)
result[room_id] = 100
else:
# Give other rooms intermediate stream orderings
result[room_id] = 500 - idx * 10
return result

# Track which rooms get updated and in what order
updated_rooms_order = []
original_update_membership = self.hs.get_room_member_handler().update_membership

async def track_update_membership(*args: Any, **kwargs: Any) -> tuple[str, int]:
room_id = args[2] # room_id is the third argument
if room_id not in updated_rooms_order:
updated_rooms_order.append(room_id)
return await original_update_membership(*args, **kwargs)

with (
patch.object(
self.store,
"bulk_get_last_event_pos_in_room_before_stream_ordering",
side_effect=mock_bulk_get_last_event,
),
patch.object(
self.hs.get_room_member_handler(),
"update_membership",
side_effect=track_update_membership,
),
):
# Trigger the profile update
self.get_success(
self.handler.set_displayname(
self.frank, synapse.types.create_requester(self.frank), "Frank Jr."
)
)

# Let the background task run
self.pump(1.0)

# Wait for task to complete
tasks = self.get_success(
self.task_scheduler.get_tasks(
actions=["update_join_states"], statuses=[TaskStatus.COMPLETE]
)
)
self.assertGreaterEqual(
len(tasks), 1, "Profile update task should complete"
)

# Verify that all rooms were eventually updated
self.assertEqual(
set(updated_rooms_order), set(room_ids), "All rooms should be updated"
)

self.assertIn(
last_room_alphabetically,
updated_rooms_order[:1],
f"Most active room {last_room_alphabetically} should be processed first, "
f"but rooms were processed in order: {updated_rooms_order}",
)

def test_set_my_name_if_disabled(self) -> None:
self.hs.config.registration.enable_set_displayname = False

Expand Down
Loading