Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@ bin
lib
include
.Python
.DS_Store
28 changes: 27 additions & 1 deletion MQTTClient.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import logging
import multiprocessing
import time
import ssl

import paho.mqtt.client as mqtt

Expand Down Expand Up @@ -35,12 +36,33 @@ def __init__(self, messageQ, commandQ, config) -> None:
self._mqttConn = mqtt.Client(client_id='RFLinkGateway')
self._mqttConn.username_pw_set(self.config['mqtt_user'], self.config['mqtt_password'])

if self.config.get('mqtt_tls', False):
self.logger.info("Enabling MQTT TLS")

cafile = self.config.get('mqtt_ca') or None
if not cafile:
raise ValueError("mqtt_tls is enabled but mqtt_ca is not set")
certfile = self.config.get('mqtt_cert') or None
keyfile = self.config.get('mqtt_key') or None

self._mqttConn.tls_set(
ca_certs=cafile,
certfile=certfile,
keyfile=keyfile ,
tls_version=ssl.PROTOCOL_TLS_CLIENT
)
reject = self.config.get('mqtt_reject_unauthorized', False)
self._mqttConn.tls_insecure_set(not reject)

self.logger.info(
"TLS reject_unauthorized=%s", reject
)

self._mqttConn.on_disconnect = self._on_disconnect
self._mqttConn.on_publish = self._on_publish
self._mqttConn.on_message = self._on_message
self._mqttConn.on_connect = self._on_connect
self.connect(self.config)


def connect (self,config) -> None:
try:
Expand Down Expand Up @@ -84,6 +106,10 @@ def _on_message(self, client, userdata, message) -> None:

def publish(self, task) -> None:
topic = "%s/%s" % (self.config['mqtt_prefix'], task['topic'])

if self.config.get('mqtt_replace_spaces', False):
topic = topic.replace(" ", "_")

if self.mqttDataFormat == 'json':
if is_number(task['payload']):
task['payload'] = '{"value": ' + str(task['payload']) + '}'
Expand Down
63 changes: 49 additions & 14 deletions RFLinkGateway.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import os
import json
import logging
import multiprocessing
Expand All @@ -12,29 +13,63 @@
import SerialProcess

logger = logging.getLogger('RFLinkGW')
formatter = logging.Formatter('%(asctime)s - %(name)s - %(funcName)s - %(levelname)s - %(message)s')
logger.setLevel(logging.DEBUG)

ch = logging.StreamHandler()
ch.setFormatter(formatter)
ch.setLevel(logging.DEBUG)
logger.addHandler(ch)
def load_config():
# load config.json and overrides with environnment variables
config = {}

try:
with open('config.json') as f:
config = json.load(f)
logger.info("Configuration loaded from config.json")
except Exception as e:
logger.error("Failed to load config.json: %s", e)
exit(1)

env = {k.lower(): v for k, v in os.environ.items()}
for key in list(config.keys()):
key_lower = key.lower()
if key_lower in env:
raw_value = env[key_lower]

# parsing JSON (list, dict, bool, int…)
try:
value = json.loads(raw_value)
except Exception:
value = raw_value

logger.info("Config override: %s = %s (env)", key, value)
config[key] = value

return config

def setup_logger(config):
level_str = config.get("log_level", "DEBUG").upper()
level = getattr(logging, level_str, logging.DEBUG)

logger.setLevel(level)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(funcName)s - %(levelname)s - %(message)s')

ch = logging.StreamHandler()
ch.setFormatter(formatter)
ch.setLevel(level)

# remove previous handlers to avoid duplicates (if reload)
if logger.hasHandlers():
logger.handlers.clear()
logger.addHandler(ch)

def main():
# load configuration
config = load_config()
setup_logger(config)
logger.info("Starting RFLinkGateway with log_level=%s", config.get("log_level"))

# messages read from device
messageQ = multiprocessing.Queue()
# messages written to device
commandQ = multiprocessing.Queue()

config = {}
try:
with open('config.json') as json_data:
config = json.load(json_data)
except Exception as e:
logger.error("Config load failed")
exit(1)

sp = SerialProcess.SerialProcess(messageQ, commandQ, config)
sp.daemon = True
sp.start()
Expand Down
22 changes: 22 additions & 0 deletions SerialProcess.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ def __init__(self, messageQ, commandQ, config) -> None:

self.processing_wdir = config['rflink_wdir_output_params']

self.ignored_devices = config.get('rflink_ignored_devices', [])
self.logger.debug("Ignored devices config: %s", self.ignored_devices)

def close(self) -> None:
self.sp.close()
self.logger.debug('Serial closed')
Expand All @@ -36,6 +39,12 @@ def prepare_output(self, data_in) -> list:
if len(data) > 3 and data[0] == '20':
family = data[2]
deviceId = data[3].split("=")[1]
if self.is_device_ignored(family, deviceId):
self.logger.debug(
"Ignoring RFLink device %s/%s" % (family, deviceId)
)
return []

switch = data[4].split("=")[1]
d = {}
for t in data[4:]:
Expand Down Expand Up @@ -112,3 +121,16 @@ def run(self):
except Exception as e:
self.logger.error('Receive error: %s' % (e))
self.connect()

def is_device_ignored(self, family, deviceId) -> bool:
family = family.lower()
deviceId = deviceId.lower()
for entry in self.ignored_devices:
if "/" in entry:
fam, dev = entry.split("/", 1)
if fam.lower() == family and dev.lower() == deviceId:
return True
else:
if entry.lower() == family:
return True
return False
10 changes: 9 additions & 1 deletion config.json.sample
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@
"mqtt_message_timeout": 60,
"mqtt_user":"your_mqtt_user",
"mqtt_password":"your_mqtt_password",
"mqtt_tls": false,
"mqtt_reject_unauthorized": false,
"mqtt_ca": "",
"mqtt_cert": "",
"mqtt_key": "",
"mqtt_replace_spaces": true,
"log_level": "DEBUG",
"rflink_tty_device": "/dev/ttyUSB0",
"rflink_direct_output_params": [
"BAT",
Expand All @@ -24,5 +31,6 @@
],
"rflink_wdir_output_params": [
"WINDIR"
]
],
"rflink_ignored_devices": []
}
19 changes: 19 additions & 0 deletions readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ Whole configuration is located in config.json file. You can copy and edit `confi
"mqtt_message_timeout": 60,
"mqtt_user":"your_mqtt_user",
"mqtt_password":"your_mqtt_password",
"mqtt_tls": false,
"mqtt_reject_unauthorized": false,
"mqtt_ca": "",
"mqtt_cert": "",
"mqtt_key": "",
"mqtt_replace_spaces": true,
"log_level": "DEBUG",
"rflink_tty_device": "/dev/ttyUSB0",
"rflink_direct_output_params": [
"BAT",
Expand All @@ -71,6 +78,10 @@ Whole configuration is located in config.json file. You can copy and edit `confi
],
"rflink_wdir_output_params": [
"WINDIR"
],
"rflink_ignored_devices": [
"RTS",
"Alecto v1/FE07"
]
}
```
Expand All @@ -81,10 +92,18 @@ config param | meaning
mqtt_port | MQTT broker port|
mqtt_prefix | prefix for publish and subscribe topic|
mqtt_format | publish and subscribe topic as `json` or `ascii` |
mqtt_tls | enable MQTT over TLS (mqtts) (`true` / `false`, default: `false`) |
mqtt_ca | path to CA certificate file used to validate MQTT broker |
mqtt_cert | path to client certificate file for mutual TLS authentication (optional) |
mqtt_key | path to client private key file for mutual TLS authentication (optional) |
mqtt_reject_unauthorized | reject invalid or untrusted server certificates (`true` = strict validation, default: `false`) |
mqtt_replace_spaces | replace spaces in MQTT topics with `_` (`true` to enable, default: `false`) |
log_level | Logging level (`DEBUG`, `INFO`, `WARNING`, `ERROR`, default: `DEBUG`) |
rflink_tty_device | Serial device |
rflink_direct_output_params | Parameters transferred to MQTT without any processing |
rflink_signed_output_params | Parameters with signed values |
rflink_wdir_output_params | Parameters with wind direction values |
rflink_ignored_devices | List of RFLink device families or specific devices to ignore (e.g. `RTS` or `RTS/AX67`) |



Expand Down