From 0b56f31f8b80fb87e02f3c159e0bf3713a4be681 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 22 May 2026 10:34:14 +0100 Subject: [PATCH 1/4] Correct misleading logging in `_add_messages_to_local_device_inbox_txn` --- synapse/storage/databases/main/deviceinbox.py | 56 +++++++++++-------- 1 file changed, 34 insertions(+), 22 deletions(-) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index fc61f46c1c5..04e3057464f 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -897,7 +897,8 @@ def _add_messages_to_local_device_inbox_txn( ) -> None: assert self._can_write_to_device - local_by_user_then_device = {} + # A map from user id, to device id, to a pair of (serialized message, msgid). + local_by_user_then_device: dict[str, dict[str, tuple[str, str]]] = {} for user_id, messages_by_device in messages_by_user_then_device.items(): messages_json_for_user = {} devices = list(messages_by_device.keys()) @@ -912,11 +913,13 @@ def _add_messages_to_local_device_inbox_txn( retcol="device_id", ) - message_json = json_encoder.encode(messages_by_device["*"]) + message_json, msgid = _serialize_to_device_message( + user_id, "*", messages_by_device["*"] + ) for device_id in devices: # Add the message for all devices for this user on this # server. - messages_json_for_user[device_id] = message_json + messages_json_for_user[device_id] = (message_json, msgid) else: if not devices: continue @@ -938,19 +941,11 @@ def _add_messages_to_local_device_inbox_txn( for (device_id,) in rows: # Only insert into the local inbox if the device exists on # this server - with start_active_span("serialise_to_device_message"): - msg = messages_by_device[device_id] - set_tag(SynapseTags.TO_DEVICE_TYPE, msg["type"]) - set_tag(SynapseTags.TO_DEVICE_SENDER, msg["sender"]) - set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id) - set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id) - set_tag( - SynapseTags.TO_DEVICE_MSGID, - msg["content"].get(EventContentFields.TO_DEVICE_MSGID), - ) - message_json = json_encoder.encode(msg) - - messages_json_for_user[device_id] = message_json + msg = messages_by_device[device_id] + message_json, msgid = _serialize_to_device_message( + user_id, device_id, msg + ) + messages_json_for_user[device_id] = (message_json, msgid) if messages_json_for_user: local_by_user_then_device[user_id] = messages_json_for_user @@ -965,22 +960,21 @@ def _add_messages_to_local_device_inbox_txn( values=[ (user_id, device_id, stream_id, message_json, self._instance_name) for user_id, messages_by_device in local_by_user_then_device.items() - for device_id, message_json in messages_by_device.items() + for device_id, (message_json, _msgid) in messages_by_device.items() ], ) if issue9533_logger.isEnabledFor(logging.DEBUG): issue9533_logger.debug( - "Stored to-device messages with stream_id %i: %s", + "Storing to-device messages with stream_id %i: %s", stream_id, [ - f"{user_id}/{device_id} (msgid " - f"{msg['content'].get(EventContentFields.TO_DEVICE_MSGID)})" + f"{user_id}/{device_id} (msgid {msgid})" for ( user_id, messages_by_device, - ) in messages_by_user_then_device.items() - for (device_id, msg) in messages_by_device.items() + ) in local_by_user_then_device.items() + for (device_id, (_msg, msgid)) in messages_by_device.items() ], ) @@ -1066,6 +1060,24 @@ def get_devices_with_messages_txn( return results +def _serialize_to_device_message( + user_id: str, device_id: str, msg: JsonDict +) -> tuple[str, str]: + """Serialiize a to-device message, ready to add to the device_inbox table. + + Returns a tuple (message_json, msgid). + """ + with start_active_span("serialise_to_device_message"): + msgid: str = msg["content"].get(EventContentFields.TO_DEVICE_MSGID, "") + set_tag(SynapseTags.TO_DEVICE_TYPE, msg["type"]) + set_tag(SynapseTags.TO_DEVICE_SENDER, msg["sender"]) + set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id) + set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id) + set_tag(SynapseTags.TO_DEVICE_MSGID, msgid) + message_json = json_encoder.encode(msg) + return message_json, msgid + + class DeviceInboxBackgroundUpdateStore(SQLBaseStore): DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop" REMOVE_DEAD_DEVICES_FROM_INBOX = "remove_dead_devices_from_device_inbox" From 004ea3a6a3118b1b2f5c06ffcc58b3765a2171dd Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 22 May 2026 11:54:37 +0100 Subject: [PATCH 2/4] Improve logging around sending replication updates In particular, include the "last_token" that we used to calculate whether there were any updates. Also a bit of general cleanup. --- synapse/replication/tcp/resource.py | 96 +++++++++++++---------------- 1 file changed, 43 insertions(+), 53 deletions(-) diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 36dd39ed672..e9cbcd958e5 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -182,13 +182,8 @@ async def _run_notifier_loop(self) -> None: self.command_handler.will_announce_positions() for stream in all_streams: - self.command_handler.send_command( - PositionCommand( - stream.NAME, - self._instance_name, - stream.last_token, - stream.last_token, - ) + self._send_position_command( + stream.NAME, stream.last_token, stream.last_token ) for stream in all_streams: @@ -205,9 +200,9 @@ async def _run_notifier_loop(self) -> None: last_token = stream.last_token logger.debug( - "Getting stream: %s: %s -> %s", + "Getting stream updates for %s: %s -> %s", stream.NAME, - stream.last_token, + last_token, stream.current_token(self._instance_name), ) try: @@ -217,26 +212,7 @@ async def _run_notifier_loop(self) -> None: logger.info("Failed to handle stream %s", stream.NAME) raise - logger.debug( - "Sending %d updates", - len(updates), - ) - - if updates: - logger.info( - "Streaming: %s -> %s (limited: %s, updates: %s, max token: %s)", - stream.NAME, - updates[-1][0], - limited, - len(updates), - current_token, - ) - stream_updates_counter.labels( - stream_name=stream.NAME, - **{SERVER_NAME_LABEL: self.server_name}, - ).inc(len(updates)) - - else: + if not updates: # The token has advanced but there is no data to # send, so we send a `POSITION` to inform other # workers of the updated position. @@ -266,21 +242,26 @@ async def _run_notifier_loop(self) -> None: # POSITION with last token of X+1, which will # cause them to check if there were any missing # updates between X and X+1. - logger.info( - "Sending position: %s -> %s", - stream.NAME, - current_token, - ) - self.command_handler.send_command( - PositionCommand( - stream.NAME, - self._instance_name, - last_token, - current_token, - ) + self._send_position_command( + stream.NAME, last_token, current_token ) continue + logger.info( + "Streaming %s: %s -> %s (limited: %s, updates: %s, max token: %s)", + stream.NAME, + last_token, + updates[-1][0], + limited, + len(updates), + current_token, + ) + + stream_updates_counter.labels( + stream_name=stream.NAME, + **{SERVER_NAME_LABEL: self.server_name}, + ).inc(len(updates)) + # Some streams return multiple rows with the same stream IDs, # we need to make sure they get sent out in batches. We do # this by setting the current token to all but the last of @@ -300,18 +281,8 @@ async def _run_notifier_loop(self) -> None: # token, in which case we want to send out a `POSITION` # to tell other workers the actual current position. if updates[-1][0] < current_token: - logger.info( - "Sending position: %s -> %s", - stream.NAME, - current_token, - ) - self.command_handler.send_command( - PositionCommand( - stream.NAME, - self._instance_name, - updates[-1][0], - current_token, - ) + self._send_position_command( + stream.NAME, updates[-1][0], current_token ) logger.debug("No more pending updates, breaking poke loop") @@ -319,6 +290,25 @@ async def _run_notifier_loop(self) -> None: self.pending_updates = False self.is_looping = False + def _send_position_command( + self, stream_name: str, prev_token: int, new_token: int + ) -> None: + """Send a POSITION command over replication""" + logger.info( + "Sending position for %s: %s -> %s", + stream_name, + prev_token, + new_token, + ) + self.command_handler.send_command( + PositionCommand( + stream_name, + self._instance_name, + prev_token, + new_token, + ) + ) + def _batch_updates( updates: list[tuple[Token, StreamRow]], From a75ceb6c4d81147fc04f6baa2f88b737c85c597f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 22 May 2026 11:56:57 +0100 Subject: [PATCH 3/4] Add logging to MultiWriterIdGenerator for to_device stream --- synapse/storage/util/id_generators.py | 25 ++++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index a1afe6d2aee..d0a8433481e 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -38,6 +38,7 @@ import attr from sortedcontainers import SortedList, SortedSet +from synapse.logging import issue9533_logger from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.database import ( DatabasePool, @@ -774,6 +775,14 @@ def _add_persisted_position(self, new_id: int) -> None: # We move the current min position up if the minimum current positions # of all instances is higher (since by definition all positions less # that that have been persisted). + # + # If we are one of several writers, then we don't need to factor our own + # `_current_position` into `_persisted_upto_position` unless we have unfinished + # writes (since we know that any future write that happens locally will have + # a higher stream ID than any of the other writers' current positions). In other + # words, and we have no outstanding writes, then the new `_persisted_upto_position` + # can be the minimum of all *other* writers' current positions, + # our_current_position = self._current_positions.get(self._instance_name, 0) min_curr = min( ( @@ -783,7 +792,6 @@ def _add_persisted_position(self, new_id: int) -> None: ), default=our_current_position, ) - if our_current_position and (self._unfinished_ids or self._in_flight_fetches): min_curr = min(min_curr, our_current_position) @@ -820,6 +828,21 @@ def _add_persisted_position(self, new_id: int) -> None: # do. break + # Hacky debug logging to attempt to trace https://github.com/element-hq/synapse/issues/19795 + if ( + issue9533_logger.isEnabledFor(logging.DEBUG) + and self._stream_name == "to_device" + ): + issue9533_logger.debug( + "stream id %i now persisted; _current_positions=%s _unfinished_ids=%s, _known_persisted_positions=%s _persisted_upto_position=%i min_curr=%i", + new_id, + self._current_positions, + self._unfinished_ids, + self._known_persisted_positions, + self._persisted_upto_position, + min_curr, + ) + def _update_stream_positions_table_txn(self, txn: Cursor) -> None: """Update the `stream_positions` table with newly persisted position.""" From d55d564ff11bcb739d72da18ae6ef21934e0883c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Fri, 22 May 2026 11:58:30 +0100 Subject: [PATCH 4/4] changelog --- changelog.d/19801.misc | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/19801.misc diff --git a/changelog.d/19801.misc b/changelog.d/19801.misc new file mode 100644 index 00000000000..2d10d9f4224 --- /dev/null +++ b/changelog.d/19801.misc @@ -0,0 +1 @@ +Add logging to the to-device message replication stream.