diff --git a/changelog.d/19785.bugfix b/changelog.d/19785.bugfix new file mode 100644 index 00000000000..643481ee276 --- /dev/null +++ b/changelog.d/19785.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where the badge notification count for a room could become permanently inflated if a read receipt was sent before the room's notification counts were first summarised. diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index a66caa672cd..ac60c45db57 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -1509,6 +1509,41 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: "last_receipt_stream_ordering": stream_ordering, }, ) + # If no summary row exists yet for a thread that has pending push + # actions (room active but not yet through a rotation cycle), the + # UPDATE above is a silent no-op for that thread and + # last_receipt_stream_ordering is never persisted. + # _rotate_notifs_before_txn would then INSERT the row with + # last_receipt_stream_ordering=NULL, causing the badge query to + # include every event before the receipt as unread. Pre-populate + # rows for every thread with pending push actions so rotation + # only counts events that arrive after this receipt. + txn.execute( + """ + SELECT DISTINCT thread_id + FROM event_push_actions + WHERE user_id = ? AND room_id = ? + """, + (user_id, room_id), + ) + pending_thread_ids = [row[0] for row in txn] + for pending_thread_id in pending_thread_ids: + self.db_pool.simple_upsert_txn( + txn, + table="event_push_summary", + keyvalues={ + "user_id": user_id, + "room_id": room_id, + "thread_id": pending_thread_id, + }, + values={}, + insertion_values={ + "notif_count": 0, + "unread_count": 0, + "stream_ordering": old_rotate_stream_ordering, + "last_receipt_stream_ordering": stream_ordering, + }, + ) # For a threaded receipt, we *always* want to update that receipt, # event if there are no new notifications in that thread. This ensures @@ -1517,8 +1552,10 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool: unread_counts = [(0, 0, thread_id)] # Then any updated threads get their notification count and unread - # count updated. - self.db_pool.simple_update_many_txn( + # count updated. Use upsert so that a row is created if none exists + # yet (same race as the unthreaded case above: without this, rotation + # would INSERT with last_receipt_stream_ordering=NULL). + self.db_pool.simple_upsert_many_txn( txn, table="event_push_summary", key_names=("room_id", "user_id", "thread_id"), diff --git a/synapse/storage/schema/main/delta/94/05_backfill_event_push_summary_receipt.sql b/synapse/storage/schema/main/delta/94/05_backfill_event_push_summary_receipt.sql new file mode 100644 index 00000000000..63538291f29 --- /dev/null +++ b/synapse/storage/schema/main/delta/94/05_backfill_event_push_summary_receipt.sql @@ -0,0 +1,69 @@ +-- +-- This file is licensed under the Affero General Public License (AGPL) version 3. +-- +-- Copyright (C) 2026 New Vector Ltd +-- +-- This program is free software: you can redistribute it and/or modify +-- it under the terms of the GNU Affero General Public License as +-- published by the Free Software Foundation, either version 3 of the +-- License, or (at your option) any later version. +-- +-- See the GNU Affero General Public License for more details: +-- . + +-- Backfill last_receipt_stream_ordering for event_push_summary rows created +-- with last_receipt_stream_ordering=NULL by the rotation job before the code +-- fix in _handle_new_receipts_for_notifs_txn was applied. +-- +-- NULL has a dual meaning in this column (see the schema delta that added it): +-- 1. Legacy rows from old Synapse that maintained counts synchronously. +-- 2. Bug-affected rows where the receipt UPDATE was a silent no-op. +-- +-- For a given event_push_summary row, the relevant receipts are unthreaded +-- receipts (cover all threads) and the threaded receipt for that thread. +-- +-- For both kinds of stale row, if stream_ordering <= the max relevant receipt +-- then every event in the summary predates the receipt and the counts should +-- be zero. If stream_ordering > the max receipt, some events after the +-- receipt are included; we set last_receipt_stream_ordering but leave the +-- count (it may be inflated, but will self-correct when the user next reads +-- the room). +UPDATE event_push_summary +SET last_receipt_stream_ordering = ( + SELECT MAX(r.event_stream_ordering) + FROM receipts_linearized AS r + WHERE r.user_id = event_push_summary.user_id + AND r.room_id = event_push_summary.room_id + AND r.receipt_type IN ('m.read', 'm.read.private') + AND (r.thread_id IS NULL OR r.thread_id = event_push_summary.thread_id) + ), + notif_count = CASE + WHEN stream_ordering <= ( + SELECT MAX(r.event_stream_ordering) + FROM receipts_linearized AS r + WHERE r.user_id = event_push_summary.user_id + AND r.room_id = event_push_summary.room_id + AND r.receipt_type IN ('m.read', 'm.read.private') + AND (r.thread_id IS NULL OR r.thread_id = event_push_summary.thread_id) + ) THEN 0 + ELSE notif_count + END, + unread_count = CASE + WHEN stream_ordering <= ( + SELECT MAX(r.event_stream_ordering) + FROM receipts_linearized AS r + WHERE r.user_id = event_push_summary.user_id + AND r.room_id = event_push_summary.room_id + AND r.receipt_type IN ('m.read', 'm.read.private') + AND (r.thread_id IS NULL OR r.thread_id = event_push_summary.thread_id) + ) THEN 0 + ELSE unread_count + END +WHERE last_receipt_stream_ordering IS NULL + AND EXISTS ( + SELECT 1 FROM receipts_linearized AS r + WHERE r.user_id = event_push_summary.user_id + AND r.room_id = event_push_summary.room_id + AND r.receipt_type IN ('m.read', 'm.read.private') + AND (r.thread_id IS NULL OR r.thread_id = event_push_summary.thread_id) + ); diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index d5ed947094c..8a18cff317c 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -286,6 +286,115 @@ def _mark_read(event_id: str) -> None: _rotate() _assert_counts(0, 0) + def test_count_aggregation_receipt_before_first_rotation(self) -> None: + """ + Regression test: reading a highlight before the first rotation must not + permanently inflate the badge count. + + Highlights survive the receipt-triggered DELETE (highlight=1), so if + last_receipt_stream_ordering is NULL when rotation creates the summary + row, the badge query re-counts them forever. + """ + user_id, token, _, other_token, room_id = self._create_users_and_room() + + def _assert_badge(expected: int) -> None: + counts = self.get_success( + self.store.db_pool.runInteraction( + "get-aggregate-unread-counts", + self.store._get_unread_counts_by_room_for_user_txn, + user_id, + ) + ) + self.assertEqual(counts.get(room_id, 0), expected) + + def _send(highlight: bool = False) -> str: + return self.helper.send_event( + room_id, + type="m.room.message", + content={"msgtype": "m.text", "body": user_id if highlight else "msg"}, + tok=other_token, + )["event_id"] + + def _read(event_id: str) -> None: + self.get_success( + self.store.insert_receipt( + room_id, + "m.read", + user_id=user_id, + event_ids=[event_id], + thread_id=None, + data={}, + ) + ) + + # Highlight arrives; user reads it before any rotation (no summary row exists). + _read(_send(highlight=True)) + # One new event after the receipt makes stream_ordering > max_clause true. + _send() + # Without the fix: badge = 2 (highlight re-counted). With fix: badge = 1. + self.get_success(self.store._rotate_notifs()) + _assert_badge(1) + + def test_count_aggregation_receipt_before_first_rotation_in_thread(self) -> None: + """ + Same regression as test_count_aggregation_receipt_before_first_rotation, + but for a highlight inside a thread cleared by an unthreaded receipt. + + The fix must pre-populate event_push_summary for every thread with + pending push actions, not just MAIN_TIMELINE. + """ + user_id, token, _, other_token, room_id = self._create_users_and_room() + + def _assert_badge(expected: int) -> None: + counts = self.get_success( + self.store.db_pool.runInteraction( + "get-aggregate-unread-counts", + self.store._get_unread_counts_by_room_for_user_txn, + user_id, + ) + ) + self.assertEqual(counts.get(room_id, 0), expected) + + def _send(thread_root: str | None = None, highlight: bool = False) -> str: + content: JsonDict = { + "msgtype": "m.text", + "body": user_id if highlight else "msg", + } + if thread_root is not None: + content["m.relates_to"] = { + "rel_type": RelationTypes.THREAD, + "event_id": thread_root, + } + return self.helper.send_event( + room_id, + type="m.room.message", + content=content, + tok=other_token, + )["event_id"] + + def _read(event_id: str) -> None: + self.get_success( + self.store.insert_receipt( + room_id, + "m.read", + user_id=user_id, + event_ids=[event_id], + thread_id=None, + data={}, + ) + ) + + # A thread root, then a highlight inside that thread. + thread_root = _send() + thread_highlight = _send(thread_root=thread_root, highlight=True) + # User reads everything with an unthreaded receipt before any rotation. + _read(thread_highlight) + # A new event in the same thread makes stream_ordering > max_clause true. + _send(thread_root=thread_root) + # Without the fix: badge = 2 (thread highlight re-counted). With: badge = 1. + self.get_success(self.store._rotate_notifs()) + _assert_badge(1) + def test_count_aggregation_threads(self) -> None: """ This is essentially the same test as test_count_aggregation, but adds