Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/19785.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix a long-standing bug where the badge notification count for a room could become permanently inflated if a read receipt was sent before the room's notification counts were first summarised.
41 changes: 39 additions & 2 deletions synapse/storage/databases/main/event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1509,6 +1509,41 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
"last_receipt_stream_ordering": stream_ordering,
},
)
# If no summary row exists yet for a thread that has pending push
# actions (room active but not yet through a rotation cycle), the
# UPDATE above is a silent no-op for that thread and
# last_receipt_stream_ordering is never persisted.
# _rotate_notifs_before_txn would then INSERT the row with
# last_receipt_stream_ordering=NULL, causing the badge query to
# include every event before the receipt as unread. Pre-populate
# rows for every thread with pending push actions so rotation
# only counts events that arrive after this receipt.
txn.execute(
"""
SELECT DISTINCT thread_id
FROM event_push_actions
WHERE user_id = ? AND room_id = ?
""",
(user_id, room_id),
)
pending_thread_ids = [row[0] for row in txn]
for pending_thread_id in pending_thread_ids:
self.db_pool.simple_upsert_txn(
txn,
table="event_push_summary",
keyvalues={
"user_id": user_id,
"room_id": room_id,
"thread_id": pending_thread_id,
},
values={},
insertion_values={
"notif_count": 0,
"unread_count": 0,
"stream_ordering": old_rotate_stream_ordering,
"last_receipt_stream_ordering": stream_ordering,
},
)

# For a threaded receipt, we *always* want to update that receipt,
# event if there are no new notifications in that thread. This ensures
Expand All @@ -1517,8 +1552,10 @@ def _handle_new_receipts_for_notifs_txn(self, txn: LoggingTransaction) -> bool:
unread_counts = [(0, 0, thread_id)]

# Then any updated threads get their notification count and unread
# count updated.
self.db_pool.simple_update_many_txn(
# count updated. Use upsert so that a row is created if none exists
# yet (same race as the unthreaded case above: without this, rotation
# would INSERT with last_receipt_stream_ordering=NULL).
self.db_pool.simple_upsert_many_txn(
txn,
table="event_push_summary",
key_names=("room_id", "user_id", "thread_id"),
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
--
-- This file is licensed under the Affero General Public License (AGPL) version 3.
--
-- Copyright (C) 2026 New Vector Ltd
--
-- This program is free software: you can redistribute it and/or modify
-- it under the terms of the GNU Affero General Public License as
-- published by the Free Software Foundation, either version 3 of the
-- License, or (at your option) any later version.
--
-- See the GNU Affero General Public License for more details:
-- <https://www.gnu.org/licenses/agpl-3.0.html>.

-- Backfill last_receipt_stream_ordering for event_push_summary rows created
-- with last_receipt_stream_ordering=NULL by the rotation job before the code
-- fix in _handle_new_receipts_for_notifs_txn was applied.
--
-- NULL has a dual meaning in this column (see the schema delta that added it):
-- 1. Legacy rows from old Synapse that maintained counts synchronously.
-- 2. Bug-affected rows where the receipt UPDATE was a silent no-op.
--
-- For a given event_push_summary row, the relevant receipts are unthreaded
-- receipts (cover all threads) and the threaded receipt for that thread.
--
-- For both kinds of stale row, if stream_ordering <= the max relevant receipt
-- then every event in the summary predates the receipt and the counts should
-- be zero. If stream_ordering > the max receipt, some events after the
-- receipt are included; we set last_receipt_stream_ordering but leave the
-- count (it may be inflated, but will self-correct when the user next reads
-- the room).
UPDATE event_push_summary
SET last_receipt_stream_ordering = (
SELECT MAX(r.event_stream_ordering)
FROM receipts_linearized AS r
WHERE r.user_id = event_push_summary.user_id
AND r.room_id = event_push_summary.room_id
AND r.receipt_type IN ('m.read', 'm.read.private')
AND (r.thread_id IS NULL OR r.thread_id = event_push_summary.thread_id)
),
notif_count = CASE
WHEN stream_ordering <= (
SELECT MAX(r.event_stream_ordering)
FROM receipts_linearized AS r
WHERE r.user_id = event_push_summary.user_id
AND r.room_id = event_push_summary.room_id
AND r.receipt_type IN ('m.read', 'm.read.private')
AND (r.thread_id IS NULL OR r.thread_id = event_push_summary.thread_id)
) THEN 0
ELSE notif_count
END,
unread_count = CASE
WHEN stream_ordering <= (
SELECT MAX(r.event_stream_ordering)
FROM receipts_linearized AS r
WHERE r.user_id = event_push_summary.user_id
AND r.room_id = event_push_summary.room_id
AND r.receipt_type IN ('m.read', 'm.read.private')
AND (r.thread_id IS NULL OR r.thread_id = event_push_summary.thread_id)
) THEN 0
ELSE unread_count
END
WHERE last_receipt_stream_ordering IS NULL
AND EXISTS (
SELECT 1 FROM receipts_linearized AS r
WHERE r.user_id = event_push_summary.user_id
AND r.room_id = event_push_summary.room_id
AND r.receipt_type IN ('m.read', 'm.read.private')
AND (r.thread_id IS NULL OR r.thread_id = event_push_summary.thread_id)
);
109 changes: 109 additions & 0 deletions tests/storage/test_event_push_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,115 @@ def _mark_read(event_id: str) -> None:
_rotate()
_assert_counts(0, 0)

def test_count_aggregation_receipt_before_first_rotation(self) -> None:
"""
Regression test: reading a highlight before the first rotation must not
permanently inflate the badge count.

Highlights survive the receipt-triggered DELETE (highlight=1), so if
last_receipt_stream_ordering is NULL when rotation creates the summary
row, the badge query re-counts them forever.
"""
user_id, token, _, other_token, room_id = self._create_users_and_room()

def _assert_badge(expected: int) -> None:
counts = self.get_success(
self.store.db_pool.runInteraction(
"get-aggregate-unread-counts",
self.store._get_unread_counts_by_room_for_user_txn,
user_id,
)
)
self.assertEqual(counts.get(room_id, 0), expected)

def _send(highlight: bool = False) -> str:
return self.helper.send_event(
room_id,
type="m.room.message",
content={"msgtype": "m.text", "body": user_id if highlight else "msg"},
tok=other_token,
)["event_id"]

def _read(event_id: str) -> None:
self.get_success(
self.store.insert_receipt(
room_id,
"m.read",
user_id=user_id,
event_ids=[event_id],
thread_id=None,
data={},
)
)

# Highlight arrives; user reads it before any rotation (no summary row exists).
_read(_send(highlight=True))
# One new event after the receipt makes stream_ordering > max_clause true.
_send()
# Without the fix: badge = 2 (highlight re-counted). With fix: badge = 1.
self.get_success(self.store._rotate_notifs())
_assert_badge(1)

def test_count_aggregation_receipt_before_first_rotation_in_thread(self) -> None:
"""
Same regression as test_count_aggregation_receipt_before_first_rotation,
but for a highlight inside a thread cleared by an unthreaded receipt.

The fix must pre-populate event_push_summary for every thread with
pending push actions, not just MAIN_TIMELINE.
"""
user_id, token, _, other_token, room_id = self._create_users_and_room()

def _assert_badge(expected: int) -> None:
counts = self.get_success(
self.store.db_pool.runInteraction(
"get-aggregate-unread-counts",
self.store._get_unread_counts_by_room_for_user_txn,
user_id,
)
)
self.assertEqual(counts.get(room_id, 0), expected)

def _send(thread_root: str | None = None, highlight: bool = False) -> str:
content: JsonDict = {
"msgtype": "m.text",
"body": user_id if highlight else "msg",
}
if thread_root is not None:
content["m.relates_to"] = {
"rel_type": RelationTypes.THREAD,
"event_id": thread_root,
}
return self.helper.send_event(
room_id,
type="m.room.message",
content=content,
tok=other_token,
)["event_id"]

def _read(event_id: str) -> None:
self.get_success(
self.store.insert_receipt(
room_id,
"m.read",
user_id=user_id,
event_ids=[event_id],
thread_id=None,
data={},
)
)

# A thread root, then a highlight inside that thread.
thread_root = _send()
thread_highlight = _send(thread_root=thread_root, highlight=True)
# User reads everything with an unthreaded receipt before any rotation.
_read(thread_highlight)
# A new event in the same thread makes stream_ordering > max_clause true.
_send(thread_root=thread_root)
# Without the fix: badge = 2 (thread highlight re-counted). With: badge = 1.
self.get_success(self.store._rotate_notifs())
_assert_badge(1)

def test_count_aggregation_threads(self) -> None:
"""
This is essentially the same test as test_count_aggregation, but adds
Expand Down
Loading