Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/19801.misc
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add logging to the to-device message replication stream.
96 changes: 43 additions & 53 deletions synapse/replication/tcp/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,8 @@ async def _run_notifier_loop(self) -> None:
self.command_handler.will_announce_positions()

for stream in all_streams:
self.command_handler.send_command(
PositionCommand(
stream.NAME,
self._instance_name,
stream.last_token,
stream.last_token,
)
self._send_position_command(
stream.NAME, stream.last_token, stream.last_token
)

for stream in all_streams:
Expand All @@ -205,9 +200,9 @@ async def _run_notifier_loop(self) -> None:
last_token = stream.last_token

logger.debug(
"Getting stream: %s: %s -> %s",
"Getting stream updates for %s: %s -> %s",
stream.NAME,
stream.last_token,
last_token,
stream.current_token(self._instance_name),
)
try:
Expand All @@ -217,26 +212,7 @@ async def _run_notifier_loop(self) -> None:
logger.info("Failed to handle stream %s", stream.NAME)
raise

logger.debug(
"Sending %d updates",
len(updates),
)

if updates:
logger.info(
"Streaming: %s -> %s (limited: %s, updates: %s, max token: %s)",
stream.NAME,
updates[-1][0],
limited,
len(updates),
current_token,
)
stream_updates_counter.labels(
stream_name=stream.NAME,
**{SERVER_NAME_LABEL: self.server_name},
).inc(len(updates))

else:
if not updates:
# The token has advanced but there is no data to
# send, so we send a `POSITION` to inform other
# workers of the updated position.
Expand Down Expand Up @@ -266,21 +242,26 @@ async def _run_notifier_loop(self) -> None:
# POSITION with last token of X+1, which will
# cause them to check if there were any missing
# updates between X and X+1.
logger.info(
"Sending position: %s -> %s",
stream.NAME,
current_token,
)
self.command_handler.send_command(
PositionCommand(
stream.NAME,
self._instance_name,
last_token,
current_token,
)
self._send_position_command(
stream.NAME, last_token, current_token
)
continue

logger.info(
"Streaming %s: %s -> %s (limited: %s, updates: %s, max token: %s)",
stream.NAME,
last_token,
updates[-1][0],
limited,
len(updates),
current_token,
)

stream_updates_counter.labels(
stream_name=stream.NAME,
**{SERVER_NAME_LABEL: self.server_name},
).inc(len(updates))

# Some streams return multiple rows with the same stream IDs,
# we need to make sure they get sent out in batches. We do
# this by setting the current token to all but the last of
Expand All @@ -300,25 +281,34 @@ async def _run_notifier_loop(self) -> None:
# token, in which case we want to send out a `POSITION`
# to tell other workers the actual current position.
if updates[-1][0] < current_token:
logger.info(
"Sending position: %s -> %s",
stream.NAME,
current_token,
)
self.command_handler.send_command(
PositionCommand(
stream.NAME,
self._instance_name,
updates[-1][0],
current_token,
)
self._send_position_command(
stream.NAME, updates[-1][0], current_token
)

logger.debug("No more pending updates, breaking poke loop")
finally:
self.pending_updates = False
self.is_looping = False

def _send_position_command(
self, stream_name: str, prev_token: int, new_token: int
) -> None:
"""Send a POSITION command over replication"""
logger.info(
"Sending position for %s: %s -> %s",
stream_name,
prev_token,
new_token,
)
self.command_handler.send_command(
PositionCommand(
stream_name,
self._instance_name,
prev_token,
new_token,
)
)


def _batch_updates(
updates: list[tuple[Token, StreamRow]],
Expand Down
56 changes: 34 additions & 22 deletions synapse/storage/databases/main/deviceinbox.py
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,8 @@ def _add_messages_to_local_device_inbox_txn(
) -> None:
assert self._can_write_to_device

local_by_user_then_device = {}
# A map from user id, to device id, to a pair of (serialized message, msgid).
local_by_user_then_device: dict[str, dict[str, tuple[str, str]]] = {}
for user_id, messages_by_device in messages_by_user_then_device.items():
messages_json_for_user = {}
devices = list(messages_by_device.keys())
Expand All @@ -912,11 +913,13 @@ def _add_messages_to_local_device_inbox_txn(
retcol="device_id",
)

message_json = json_encoder.encode(messages_by_device["*"])
message_json, msgid = _serialize_to_device_message(
user_id, "*", messages_by_device["*"]
)
for device_id in devices:
# Add the message for all devices for this user on this
# server.
messages_json_for_user[device_id] = message_json
messages_json_for_user[device_id] = (message_json, msgid)
else:
if not devices:
continue
Expand All @@ -938,19 +941,11 @@ def _add_messages_to_local_device_inbox_txn(
for (device_id,) in rows:
# Only insert into the local inbox if the device exists on
# this server
with start_active_span("serialise_to_device_message"):
msg = messages_by_device[device_id]
set_tag(SynapseTags.TO_DEVICE_TYPE, msg["type"])
set_tag(SynapseTags.TO_DEVICE_SENDER, msg["sender"])
set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id)
set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id)
set_tag(
SynapseTags.TO_DEVICE_MSGID,
msg["content"].get(EventContentFields.TO_DEVICE_MSGID),
)
message_json = json_encoder.encode(msg)

messages_json_for_user[device_id] = message_json
msg = messages_by_device[device_id]
message_json, msgid = _serialize_to_device_message(
user_id, device_id, msg
)
messages_json_for_user[device_id] = (message_json, msgid)

if messages_json_for_user:
local_by_user_then_device[user_id] = messages_json_for_user
Expand All @@ -965,22 +960,21 @@ def _add_messages_to_local_device_inbox_txn(
values=[
(user_id, device_id, stream_id, message_json, self._instance_name)
for user_id, messages_by_device in local_by_user_then_device.items()
for device_id, message_json in messages_by_device.items()
for device_id, (message_json, _msgid) in messages_by_device.items()
],
)

if issue9533_logger.isEnabledFor(logging.DEBUG):
issue9533_logger.debug(
"Stored to-device messages with stream_id %i: %s",
"Storing to-device messages with stream_id %i: %s",
stream_id,
[
f"{user_id}/{device_id} (msgid "
f"{msg['content'].get(EventContentFields.TO_DEVICE_MSGID)})"
f"{user_id}/{device_id} (msgid {msgid})"
for (
user_id,
messages_by_device,
) in messages_by_user_then_device.items()
for (device_id, msg) in messages_by_device.items()
) in local_by_user_then_device.items()
for (device_id, (_msg, msgid)) in messages_by_device.items()
],
)

Expand Down Expand Up @@ -1066,6 +1060,24 @@ def get_devices_with_messages_txn(
return results


def _serialize_to_device_message(
user_id: str, device_id: str, msg: JsonDict
) -> tuple[str, str]:
"""Serialiize a to-device message, ready to add to the device_inbox table.

Returns a tuple (message_json, msgid).
"""
with start_active_span("serialise_to_device_message"):
msgid: str = msg["content"].get(EventContentFields.TO_DEVICE_MSGID, "")
set_tag(SynapseTags.TO_DEVICE_TYPE, msg["type"])
set_tag(SynapseTags.TO_DEVICE_SENDER, msg["sender"])
set_tag(SynapseTags.TO_DEVICE_RECIPIENT, user_id)
set_tag(SynapseTags.TO_DEVICE_RECIPIENT_DEVICE, device_id)
set_tag(SynapseTags.TO_DEVICE_MSGID, msgid)
message_json = json_encoder.encode(msg)
return message_json, msgid


class DeviceInboxBackgroundUpdateStore(SQLBaseStore):
DEVICE_INBOX_STREAM_ID = "device_inbox_stream_drop"
REMOVE_DEAD_DEVICES_FROM_INBOX = "remove_dead_devices_from_device_inbox"
Expand Down
25 changes: 24 additions & 1 deletion synapse/storage/util/id_generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import attr
from sortedcontainers import SortedList, SortedSet

from synapse.logging import issue9533_logger
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.database import (
DatabasePool,
Expand Down Expand Up @@ -774,6 +775,14 @@ def _add_persisted_position(self, new_id: int) -> None:
# We move the current min position up if the minimum current positions
# of all instances is higher (since by definition all positions less
# that that have been persisted).
#
# If we are one of several writers, then we don't need to factor our own
# `_current_position` into `_persisted_upto_position` unless we have unfinished
# writes (since we know that any future write that happens locally will have
# a higher stream ID than any of the other writers' current positions). In other
# words, and we have no outstanding writes, then the new `_persisted_upto_position`
# can be the minimum of all *other* writers' current positions,
#
our_current_position = self._current_positions.get(self._instance_name, 0)
min_curr = min(
(
Expand All @@ -783,7 +792,6 @@ def _add_persisted_position(self, new_id: int) -> None:
),
default=our_current_position,
)

if our_current_position and (self._unfinished_ids or self._in_flight_fetches):
min_curr = min(min_curr, our_current_position)

Expand Down Expand Up @@ -820,6 +828,21 @@ def _add_persisted_position(self, new_id: int) -> None:
# do.
break

# Hacky debug logging to attempt to trace https://github.com/element-hq/synapse/issues/19795
if (
issue9533_logger.isEnabledFor(logging.DEBUG)
and self._stream_name == "to_device"
):
issue9533_logger.debug(
"stream id %i now persisted; _current_positions=%s _unfinished_ids=%s, _known_persisted_positions=%s _persisted_upto_position=%i min_curr=%i",
new_id,
self._current_positions,
self._unfinished_ids,
self._known_persisted_positions,
self._persisted_upto_position,
min_curr,
)

def _update_stream_positions_table_txn(self, txn: Cursor) -> None:
"""Update the `stream_positions` table with newly persisted position."""

Expand Down
Loading