Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 44 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ Supported Features:
* Various other service actions (e.g. request metrics, trace route)
* Bundled meshtastic web client for manual interaction with gateway
* MQTT client proxy support (forwards messages from radio to MQTT broker)
* MQTT downlink forwarding (subscribes to MQTT topics and forwards to radio)
* LoRa-to-MQTT uplink relay (re-encrypts received LoRa packets and publishes to MQTT)

For more details, see check the [documentation](#documentation).

Expand Down Expand Up @@ -305,6 +307,48 @@ Inside the Meshtastic Web Client:
4. Press "New Connection" - the correct hostname is already populated
5. Press "Connect"

## MQTT Proxy

The integration includes MQTT client proxy support for bridging LoRa mesh traffic over the internet via an MQTT broker.
This allows devices connected to different gateways (potentially hundreds of miles apart) to communicate as if they were
on the same local mesh.

### How It Works

When the Meshtastic device has MQTT enabled with `proxy_to_client_enabled`, the integration manages the MQTT connection
on behalf of the device. Three levels of functionality are available:

1. **Uplink (always on):** Messages the device *originates* are published to the MQTT broker by the firmware via
`mqttClientProxyMessage`. This is the existing behavior and requires no additional configuration.

2. **MQTT Downlink Forwarding (opt-in):** The integration subscribes to MQTT downlink topics for channels with
`downlink_enabled` and forwards incoming messages to the radio. This allows other MQTT gateways on the internet to
send messages into your local LoRa mesh.

3. **LoRa-to-MQTT Uplink Relay (opt-in):** The firmware in proxy mode only publishes packets the device *originates*,
not packets *received* from other LoRa nodes. When this option is enabled, the integration re-encrypts received LoRa
packets using the channel PSK and publishes them to MQTT, making your gateway a true bidirectional bridge.

### Configuration

Enable these features in the integration options under the **MQTT Proxy** section:

- **Enable MQTT Downlink Forwarding** — subscribes to MQTT topics and forwards to radio
- **Enable LoRa-to-MQTT Uplink Relay** — re-encrypts and publishes received LoRa packets to MQTT

Both options default to **disabled** to preserve existing behavior.

### Important Notes

- **Default channels are excluded:** High-traffic public channels (LongFast, LongSlow, MediumFast, MediumSlow,
ShortFast, ShortSlow) are automatically excluded from downlink subscription and uplink relay to prevent serial link
saturation (~320 msgs/sec on `mqtt.meshtastic.org` for LongFast alone).
- **Self-loop prevention:** Messages published by your own gateway are filtered from the downlink to prevent echo loops.
- **Channel PSK required:** The LoRa-to-MQTT relay re-encrypts packets using the channel's PSK. Channels without a PSK
(no encryption) are skipped.
- **Device configuration:** The device must have `mqtt.enabled`, `mqtt.proxy_to_client_enabled`, and per-channel
`uplink_enabled`/`downlink_enabled` set in its firmware configuration.

## Contributions are welcome!

If you want to contribute to this please read the [Contribution guidelines](CONTRIBUTING.md)
Expand Down
17 changes: 16 additions & 1 deletion custom_components/meshtastic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@
CONF_CONNECTION_TCP_PORT,
CONF_CONNECTION_TYPE,
CONF_OPTION_FILTER_NODES,
CONF_OPTION_MQTT_PROXY,
CONF_OPTION_MQTT_PROXY_DOWNLINK,
CONF_OPTION_MQTT_PROXY_DOWNLINK_DEFAULT,
CONF_OPTION_MQTT_PROXY_UPLINK_RELAY,
CONF_OPTION_MQTT_PROXY_UPLINK_RELAY_DEFAULT,
CONF_OPTION_TCP_PROXY,
CONF_OPTION_TCP_PROXY_ENABLE,
CONF_OPTION_TCP_PROXY_ENABLE_DEFAULT,
Expand Down Expand Up @@ -132,7 +137,17 @@ async def async_setup_entry(
if coordinator.config_entry is None:
coordinator.config_entry = entry

client = MeshtasticApiClient(entry.data, hass=hass, config_entry_id=entry.entry_id)
client = MeshtasticApiClient(
entry.data,
hass=hass,
config_entry_id=entry.entry_id,
enable_mqtt_downlink=entry.options.get(CONF_OPTION_MQTT_PROXY, {}).get(
CONF_OPTION_MQTT_PROXY_DOWNLINK, CONF_OPTION_MQTT_PROXY_DOWNLINK_DEFAULT
),
enable_mqtt_uplink_relay=entry.options.get(CONF_OPTION_MQTT_PROXY, {}).get(
CONF_OPTION_MQTT_PROXY_UPLINK_RELAY, CONF_OPTION_MQTT_PROXY_UPLINK_RELAY_DEFAULT
),
)

try:
await client.connect()
Expand Down
243 changes: 236 additions & 7 deletions custom_components/meshtastic/aiomeshtastic/interface.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
localonly_pb2,
mesh_pb2,
module_config_pb2,
mqtt_pb2,
portnums_pb2,
telemetry_pb2,
)
Expand Down Expand Up @@ -129,6 +130,8 @@ def __init__( # noqa: PLR0913
acknowledgement_timeout: datetime.timedelta | None = None,
response_timeout: datetime.timedelta | None = None,
enable_mqtt_proxy: bool = True,
enable_mqtt_downlink: bool = False,
enable_mqtt_uplink_relay: bool = False,
) -> None:
self._logger = LOGGER.getChild(self.__class__.__name__)
self._connection = connection
Expand Down Expand Up @@ -172,6 +175,8 @@ def __init__( # noqa: PLR0913

# MQTT client for persistent connection
self._mqtt_proxy_enabled = enable_mqtt_proxy
self._mqtt_downlink_enabled = enable_mqtt_downlink
self._mqtt_uplink_relay_enabled = enable_mqtt_uplink_relay
if self._mqtt_proxy_enabled and not _has_aiomqtt:
self._logger.warning("Could not enable MQTT proxy because aiomqtt is not installed")
self._mqtt_proxy_enabled = False
Expand Down Expand Up @@ -441,7 +446,12 @@ async def _init_mqtt_client(self) -> None:
self._mqtt_connection_task = self._add_background_task(self._maintain_mqtt_connection(), name="mqtt-connection")

async def _maintain_mqtt_connection(self) -> None:
"""Maintains the MQTT connection and handles reconnections."""
"""Maintains the MQTT connection, subscribes to downlink topics, and forwards messages to the radio."""
# Default/public channel names with massive MQTT traffic on public brokers.
# These are skipped for downlink subscription to avoid saturating the serial link.
# (~320 msgs/sec on mqtt.meshtastic.org for LongFast alone)
self._mqtt_default_channels = {"LongFast", "LongSlow", "MediumFast", "MediumSlow", "ShortFast", "ShortSlow"}

while self.is_running:
try:
self._logger.debug("Connecting to MQTT broker")
Expand All @@ -453,11 +463,19 @@ async def _maintain_mqtt_connection(self) -> None:
self._mqtt_connected = True
self._logger.debug("Connected to MQTT broker")

# Wait until the interface is stopped
await self._is_stopped.wait()
# Subscribe to downlink topics for all enabled channels
if self._mqtt_downlink_enabled:
await self._subscribe_to_downlink_topics()

# Listen for incoming MQTT messages and forward to radio
async for message in self._mqtt_client.messages:
if not self.is_running:
break
await self._forward_mqtt_to_radio(message)

if not self.is_running:
break

# Interface stopped, don't reconnect
break
except MqttError as e:
self._logger.warning("Meshtastic MQTT proxy connection error: %s", e)
finally:
Expand All @@ -471,6 +489,92 @@ async def _maintain_mqtt_connection(self) -> None:
except asyncio.CancelledError:
break

async def _subscribe_to_downlink_topics(self) -> None:
"""Subscribe to MQTT downlink topics based on device channel configuration."""
if self._mqtt_client is None or not self._mqtt_connected:
return

mqtt_config = self._connected_node_module_config.mqtt
# mqtt.root already includes the region, e.g. "msh/US"
root = mqtt_config.root or "msh"

# Subscribe to downlink topics for each enabled channel.
# Default-preset channels (LongFast, etc.) are skipped because public MQTT
# brokers see 100-300+ msgs/sec on those topics, which saturates the serial
# link and blocks private-channel traffic. Uplink for these channels still
# works — only the downlink subscription is suppressed.
subscribed = False
if self._connected_node_channels:
for channel in self._connected_node_channels:
if channel.role == 0: # DISABLED
continue
channel_name = channel.settings.name or "LongFast"

# Skip default-preset channels (high-traffic on public brokers)
if channel_name in self._mqtt_default_channels:
self._logger.info(
"Skipping MQTT downlink for default channel '%s' "
"(high traffic on public brokers would saturate serial link)",
channel_name,
)
continue

if not channel.settings.downlink_enabled:
self._logger.debug(
"Skipping MQTT downlink for channel '%s' (downlink_enabled=False)",
channel_name,
)
continue

# Subscribe to encrypted channel topic
# Topic format: {root}/2/e/{channel_name}/# where root is e.g. "msh/US"
topic = f"{root}/2/e/{channel_name}/#"
try:
await self._mqtt_client.subscribe(topic, qos=1)
self._logger.info("Subscribed to MQTT downlink topic: %s", topic)
subscribed = True
except Exception:
self._logger.exception("Error subscribing to topic: %s", topic)
# Also subscribe to JSON/cleartext if json_enabled
if mqtt_config.json_enabled:
json_topic = f"{root}/2/c/{channel_name}/#"
try:
await self._mqtt_client.subscribe(json_topic, qos=1)
self._logger.info("Subscribed to MQTT JSON topic: %s", json_topic)
except Exception:
self._logger.exception("Error subscribing to JSON topic: %s", json_topic)

if not subscribed:
self._logger.info(
"No non-default channels with downlink enabled — "
"no MQTT downlink subscriptions created"
)

async def _forward_mqtt_to_radio(self, message: aiomqtt.Message) -> None:
"""Forward an incoming MQTT message to the radio as MqttClientProxyMessage."""
try:
topic = str(message.topic)

# Prevent self-loop: skip messages published by our own gateway
gateway_id = f"!{self._connected_node_info.my_node_num:08x}"
if topic.endswith(f"/{gateway_id}"):
self._logger.debug("Skipping downlink self-loop: topic=%s", topic)
return

proxy_message = mesh_pb2.MqttClientProxyMessage()
proxy_message.topic = topic
if message.payload:
proxy_message.data = bytes(message.payload)
proxy_message.retained = message.retain

to_radio = mesh_pb2.ToRadio()
to_radio.mqttClientProxyMessage.CopyFrom(proxy_message)

await self._connection.send_packet(to_radio)
self._logger.debug("Forwarded MQTT message to radio: topic=%s", topic)
except Exception:
self._logger.exception("Error forwarding MQTT message to radio")

async def _handle_mqtt_client_proxy_message(self, message: mesh_pb2.MqttClientProxyMessage) -> None:
"""
Handle MQTT client proxy messages from the radio.
Expand Down Expand Up @@ -507,9 +611,14 @@ async def _handle_mqtt_client_proxy_message(self, message: mesh_pb2.MqttClientPr
qos=1,
)
else:
self._logger.debug("No payload in MQTT message, ignoring")
# No payload = subscribe request from the firmware
self._logger.info("Subscribing to MQTT topic (firmware request): %s", message.topic)
try:
await self._mqtt_client.subscribe(message.topic, qos=1)
except Exception:
self._logger.exception("Error subscribing to MQTT topic: %s", message.topic)
except Exception:
self._logger.exception("Error publishing MQTT message")
self._logger.exception("Error handling MQTT proxy message")

@process_while_running
async def _heartbeat_loop(self) -> None:
Expand Down Expand Up @@ -615,6 +724,126 @@ async def _process_from_radio_packets_loop(self) -> None:

await self._process_packet_for_app_listener(from_radio)

# Relay LoRa-received packets to MQTT for channels with uplink_enabled
if self._mqtt_uplink_relay_enabled:
await self._relay_lora_to_mqtt(from_radio)

async def _relay_lora_to_mqtt(self, from_radio: mesh_pb2.FromRadio) -> None:
"""Relay LoRa-received packets to MQTT for channels with uplink_enabled.

The firmware in proxy mode only generates mqttClientProxyMessage for packets
the device *originates*. For received LoRa packets, we must re-encrypt the
decoded data and publish it to MQTT ourselves.
"""
if not from_radio.HasField("packet"):
return

mp = from_radio.packet

# Only relay packets from OTHER nodes (firmware handles its own)
if not self._connected_node_info or getattr(mp, "from") == self._connected_node_info.my_node_num:
return

# Skip PKI-encrypted packets (DMs) — these aren't channel-based
if mp.pki_encrypted:
return

# Must have decoded data (firmware decrypted it for us)
if not mp.HasField("decoded"):
return

# Check MQTT is connected
if not self._mqtt_connected or self._mqtt_client is None:
return

mqtt_config = self._connected_node_module_config.mqtt
if not mqtt_config.enabled or not mqtt_config.proxy_to_client_enabled:
return

# Find the channel config for this packet
channel_idx = mp.channel
if not self._connected_node_channels or channel_idx >= len(self._connected_node_channels):
return

channel = self._connected_node_channels[channel_idx]
if not channel.settings.uplink_enabled:
return

channel_name = channel.settings.name or "LongFast"

# Skip default channels (same policy as downlink)
if not hasattr(self, '_mqtt_default_channels') or channel_name in self._mqtt_default_channels:
return

# Re-encrypt the decoded data using the channel PSK
psk = bytes(channel.settings.psk)
key = self._expand_psk(psk)
if not key:
return

plaintext = mp.decoded.SerializeToString()
packet_id = mp.id
from_node = getattr(mp, "from")
encrypted = self._encrypt_packet(plaintext, key, packet_id, from_node)

# Build MeshPacket with encrypted field instead of decoded
encrypted_mp = mesh_pb2.MeshPacket()
encrypted_mp.CopyFrom(mp)
encrypted_mp.ClearField("decoded")
encrypted_mp.encrypted = encrypted

# Build ServiceEnvelope
envelope = mqtt_pb2.ServiceEnvelope()
envelope.packet.CopyFrom(encrypted_mp)
envelope.channel_id = channel_name
gateway_id = f"!{self._connected_node_info.my_node_num:08x}"
envelope.gateway_id = gateway_id

root = mqtt_config.root or "msh"
topic = f"{root}/2/e/{channel_name}/{gateway_id}"

try:
await self._mqtt_client.publish(topic, payload=envelope.SerializeToString(), qos=1)
self._logger.info(
"Relayed LoRa packet to MQTT: topic=%s from=!%08x port=%s",
topic, from_node, mp.decoded.portnum,
)
except Exception:
self._logger.exception("Error relaying LoRa packet to MQTT")

@staticmethod
def _expand_psk(psk: bytes) -> bytes:
"""Expand PSK to AES encryption key."""
if not psk:
return b"" # No encryption
if len(psk) == 1:
# Single-byte PSK = index into default key table
# Index 0 = no encryption, Index 1 = standard default key
if psk[0] == 0:
return b""
# Meshtastic default key (AES-128)
return bytes([
0xd4, 0xf1, 0xbb, 0x3a, 0x20, 0x29, 0x07, 0x59,
0xf0, 0xbc, 0xff, 0xab, 0xcf, 0x4e, 0x69, 0x01,
])
if len(psk) == 16:
return psk # AES-128
return psk # AES-256 (32 bytes)

@staticmethod
def _encrypt_packet(plaintext: bytes, key: bytes, packet_id: int, from_node: int) -> bytes:
"""Encrypt packet data using AES-CTR (Meshtastic encryption format)."""
import struct

from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes

# Nonce: packet_id (4B LE) + from_node (4B LE) + 8 zero bytes
nonce = struct.pack("<II", packet_id, from_node) + b"\x00" * 8

cipher = Cipher(algorithms.AES(key), modes.CTR(nonce))
encryptor = cipher.encryptor()
return encryptor.update(plaintext) + encryptor.finalize()

async def _process_packet_for_app_listener(self, from_radio: mesh_pb2.FromRadio) -> None: # noqa: PLR0912
packet = Packet(from_radio)
if packet.mesh_packet is None:
Expand Down
Loading