diff --git a/changelog.d/19714.bugfix b/changelog.d/19714.bugfix new file mode 100644 index 00000000000..6aba7b21a61 --- /dev/null +++ b/changelog.d/19714.bugfix @@ -0,0 +1 @@ +Have SSS return a new response immediately if a room subscription have changed and produced a new response. diff --git a/synapse/handlers/sliding_sync/__init__.py b/synapse/handlers/sliding_sync/__init__.py index 1cc587d4a7b..a3443b300cc 100644 --- a/synapse/handlers/sliding_sync/__init__.py +++ b/synapse/handlers/sliding_sync/__init__.py @@ -167,34 +167,38 @@ async def wait_for_sync_for_user( timeout_ms -= after_wait_ts - before_wait_ts timeout_ms = max(timeout_ms, 0) - # We're going to respond immediately if the timeout is 0 or if this is an - # initial sync (without a `from_token`) so we can avoid calling - # `notifier.wait_for_events()`. - if timeout_ms == 0 or from_token is None: - now_token = self.event_sources.get_current_token() - result = await self.current_sync_for_user( + # Compute a response immediately. We always need to do this before + # waiting for new data (unlike in /v3/sync), as the request config might + # have changed (e.g. new room subscriptions, etc). + now_token = self.event_sources.get_current_token() + result = await self.current_sync_for_user( + sync_config, + from_token=from_token, + to_token=now_token, + ) + + # Return immediately if we have a result, the timeout is 0, or this is + # an initial sync. + if result or timeout_ms == 0 or from_token is None: + return result, did_wait + + # Otherwise, we wait for something to happen and report it to the user. + async def current_sync_callback( + before_token: StreamToken, after_token: StreamToken + ) -> SlidingSyncResult: + return await self.current_sync_for_user( sync_config, from_token=from_token, - to_token=now_token, + to_token=after_token, ) - else: - # Otherwise, we wait for something to happen and report it to the user. - async def current_sync_callback( - before_token: StreamToken, after_token: StreamToken - ) -> SlidingSyncResult: - return await self.current_sync_for_user( - sync_config, - from_token=from_token, - to_token=after_token, - ) - result = await self.notifier.wait_for_events( - sync_config.user.to_string(), - timeout_ms, - current_sync_callback, - from_token=from_token.stream_token, - ) - did_wait = True + result = await self.notifier.wait_for_events( + sync_config.user.to_string(), + timeout_ms, + current_sync_callback, + from_token=now_token, + ) + did_wait = True return result, did_wait diff --git a/synapse/handlers/sliding_sync/room_lists.py b/synapse/handlers/sliding_sync/room_lists.py index 8969d915836..216ef3b0710 100644 --- a/synapse/handlers/sliding_sync/room_lists.py +++ b/synapse/handlers/sliding_sync/room_lists.py @@ -852,11 +852,15 @@ async def _filter_relevant_rooms_to_send( previous_connection_state.room_configs.get(room_id) ) if prev_room_sync_config is not None: - # Always include rooms whose timeline limit has increased. - # (see the "XXX: Odd behavior" described below) + # Always include rooms whose effective config has + # expanded. This covers timeline-limit increases and + # required-state additions introduced by room + # subscriptions overriding list-derived params. if ( - prev_room_sync_config.timeline_limit - < room_config.timeline_limit + prev_room_sync_config.combine_room_sync_config( + room_config + ) + != prev_room_sync_config ): rooms_should_send.add(room_id) continue diff --git a/tests/rest/client/sliding_sync/test_room_subscriptions.py b/tests/rest/client/sliding_sync/test_room_subscriptions.py index 811478f1ba7..d970af367d3 100644 --- a/tests/rest/client/sliding_sync/test_room_subscriptions.py +++ b/tests/rest/client/sliding_sync/test_room_subscriptions.py @@ -22,6 +22,7 @@ from synapse.api.constants import EventTypes, HistoryVisibility from synapse.rest.client import login, room, sync from synapse.server import HomeServer +from synapse.types import JsonDict from synapse.util.clock import Clock from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase @@ -126,6 +127,124 @@ def test_room_subscriptions_with_join_membership(self) -> None: response_body["rooms"][room_id1], ) + def test_room_subscription_required_state_expansion_returns_immediately( + self, + ) -> None: + """ + Test that adding a room subscription with stronger params than the list causes an + incremental long-poll to return immediately, even without new stream activity. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + + sync_body: JsonDict = { + "lists": { + "foo-list": { + "ranges": [[0, 0]], + "required_state": [], + "timeline_limit": 0, + } + }, + "conn_id": "conn_id", + } + _, from_token = self.do_sync(sync_body, tok=user1_tok) + + sync_body["room_subscriptions"] = { + room_id1: { + "required_state": [ + [EventTypes.Create, ""], + ], + "timeline_limit": 0, + } + } + + channel = self.make_request( + "POST", + self.sync_endpoint + f"?timeout=10000&pos={from_token}", + content=sync_body, + access_token=user1_tok, + await_result=False, + ) + channel.await_result(timeout_ms=3000) + self.assertEqual(channel.code, 200, channel.json_body) + + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + + room_response = channel.json_body["rooms"][room_id1] + self.assertNotIn("initial", room_response) + self._assertRequiredStateIncludes( + room_response["required_state"], + { + state_map[(EventTypes.Create, "")], + }, + exact=True, + ) + + def test_room_subscription_required_state_change_returns_immediately(self) -> None: + """ + Test that expanding an existing room subscription's required state causes an + incremental long-poll to return immediately, even without new stream activity. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as( + user1_id, tok=user1_tok, extra_content={"name": "Foo"} + ) + + sync_body: JsonDict = { + "room_subscriptions": { + room_id1: { + "required_state": [ + [EventTypes.Create, ""], + ], + "timeline_limit": 0, + } + }, + "conn_id": "conn_id", + } + response_body, from_token = self.do_sync(sync_body, tok=user1_tok) + + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + self._assertRequiredStateIncludes( + response_body["rooms"][room_id1]["required_state"], + { + state_map[(EventTypes.Create, "")], + }, + exact=True, + ) + + sync_body["room_subscriptions"][room_id1]["required_state"] = [ + [EventTypes.Create, ""], + [EventTypes.Name, ""], + ] + + channel = self.make_request( + "POST", + self.sync_endpoint + f"?timeout=10000&pos={from_token}", + content=sync_body, + access_token=user1_tok, + await_result=False, + ) + channel.await_result(timeout_ms=3000) + self.assertEqual(channel.code, 200, channel.json_body) + + room_response = channel.json_body["rooms"][room_id1] + self.assertNotIn("initial", room_response) + self._assertRequiredStateIncludes( + room_response["required_state"], + { + state_map[(EventTypes.Name, "")], + }, + exact=True, + ) + def test_room_subscriptions_with_leave_membership(self) -> None: """ Test `room_subscriptions` with a leave room should give us timeline and state