From 4bc64b05ede187192d91432ff58b5fd264ea7704 Mon Sep 17 00:00:00 2001 From: LePailleurThibault Date: Mon, 31 Oct 2022 15:33:02 +0100 Subject: [PATCH 1/3] MQTT compatibility with QoS Some MQTT brokers do not support qos=2. Changing transport service to publish with qos=1. Adding a cache in transport service to avoid sending redundant messages. --- python_transport/tests/test_arguments.py | 15 ++- .../wirepas_gateway/protocol/mqtt_wrapper.py | 2 +- .../wirepas_gateway/transport_service.py | 123 ++++++++++++++++-- .../wirepas_gateway/utils/argument_tools.py | 26 ++++ 4 files changed, 153 insertions(+), 13 deletions(-) diff --git a/python_transport/tests/test_arguments.py b/python_transport/tests/test_arguments.py index 9d2ebc52..4edb9dad 100644 --- a/python_transport/tests/test_arguments.py +++ b/python_transport/tests/test_arguments.py @@ -22,6 +22,8 @@ env_vars["WM_GW_BUFFERING_MAX_BUFFERED_PACKETS"] = 1000 env_vars["WM_GW_BUFFERING_MAX_DELAY_WITHOUT_PUBLISH"] = 128 env_vars["WM_GW_BUFFERING_MINIMAL_SINK_COST"] = 240 +env_vars["WM_CACHE_TIME_WINDOW"] = 1800 +env_vars["WM_CACHE_UPDATE_S"] = 60 env_vars["WM_GW_ID"] = "1" env_vars["WM_GW_MODEL"] = "test" env_vars["WM_GW_VERSION"] = "pytest" @@ -54,6 +56,8 @@ "WM_GW_BUFFERING_MAX_DELAY_WITHOUT_PUBLISH" ] file_vars["buffering_minimal_sink_cost"] = env_vars["WM_GW_BUFFERING_MINIMAL_SINK_COST"] +file_vars["cache_time_window"] = env_vars["WM_CACHE_TIME_WINDOW"] +file_vars["cache_update_s"] = env_vars["WM_CACHE_UPDATE_S"] file_vars["gateway_id"] = env_vars["WM_GW_ID"] file_vars["gateway_model"] = env_vars["WM_GW_MODEL"] @@ -153,6 +157,14 @@ def content_tests(settings, vcopy): vcopy["WM_GW_BUFFERING_MINIMAL_SINK_COST"] == settings.buffering_minimal_sink_cost ) + assert ( + vcopy["WM_CACHE_TIME_WINDOW"] + == settings.cache_time_window + ) + assert ( + vcopy["WM_CACHE_UPDATE_S"] + == settings.cache_update_s + ) assert vcopy["WM_GW_ID"] == settings.gateway_id assert vcopy["WM_GW_MODEL"] == settings.gateway_model @@ -202,7 +214,8 @@ def test_defaults(): assert settings.mqtt_reconnect_delay == 0 assert settings.buffering_max_buffered_packets == 0 assert settings.buffering_max_delay_without_publish == 0 - assert settings.buffering_minimal_sink_cost == 0 + assert settings.cache_update_s == 20 + assert settings.cache_time_window == 1200 assert settings.gateway_id is None assert settings.gateway_model is None assert settings.gateway_version is None diff --git a/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py b/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py index cdf69756..9859c9f3 100644 --- a/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py +++ b/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py @@ -233,7 +233,7 @@ def _get_socket(self): def _set_last_will(self, topic, data): # Set Last wil message - self._client.will_set(topic, data, qos=2, retain=True) + self._client.will_set(topic, data, qos=1, retain=True) def run(self): self.running = True diff --git a/python_transport/wirepas_gateway/transport_service.py b/python_transport/wirepas_gateway/transport_service.py index 3a0849e5..c65a606a 100644 --- a/python_transport/wirepas_gateway/transport_service.py +++ b/python_transport/wirepas_gateway/transport_service.py @@ -8,7 +8,7 @@ import wirepas_mesh_messaging as wmm from time import time, sleep from uuid import getnode -from threading import Thread +from threading import Thread, Lock from wirepas_gateway.dbus.dbus_client import BusClient from wirepas_gateway.protocol.topic_helper import TopicGenerator, TopicParser @@ -156,6 +156,83 @@ def initialize_sink(self, name): sink.cost = self.minimum_sink_cost +class MessageManager: + def __init__( + self, + time_window, + cache_update_s + ): + """ + Manage and cache messages to avoid sending duplicate in MQTT with QoS=1. + + Args: + time_window: time in seconds after which a message is clean from the cache. + cache_update_s: period in seconds to update the list of received messages. + Must be shorter than time_window + """ + self._lock = Lock() + self.msg_list = dict() # Dictionary of received messages id mapped to their timestamps + + self.time_window = time_window + self.cache_update_s = min(cache_update_s, time_window) + + self.latest_timestamp = 0 # latest timestamp of a received message + self.last_update_time = 0 # last time an update of the list of received messages was done + + def add_msg(self, msg_id): + """ + Adds a received message to the cache. + If the message is a duplicate, it only updates its last timestamp to the current time. + + Args: + msg_id: the id of a message to be added to the cache. + """ + time_s_epoch = time() + with self._lock: + self._update_latest_timestamp(time_s_epoch) + + if self.latest_timestamp - self.last_update_time > self.cache_update_s: + self._update_msg_list() + + is_duplicate = self._is_duplicate(msg_id) + self.msg_list[msg_id] = time_s_epoch + return not is_duplicate + + def get_size(self): + """ + Returns the number of messages in the cache. + """ + with self._lock: + return len(self.msg_list) + + def _update_msg_list(self): + """ + Clean the old messages in the cache + based on cleaning time period time_window attribute. + """ + self.msg_list = { + msg_id: msg_time for (msg_id, msg_time) in self.msg_list.items() + if self.latest_timestamp - msg_time <= self.time_window + } + self.last_update_time = self.latest_timestamp + + def _update_latest_timestamp(self, timestamp): + """ + Updates the latest timestamp of a received message. + """ + self.latest_timestamp = max(self.latest_timestamp, timestamp) + + def _is_duplicate(self, msg_id): + """ + Returns True if the id of a message is already in the cache. + Return False otherwise. + + Args: + msg_id: id of the message received + """ + return msg_id in self.msg_list + + class TransportService(BusClient): """ Implementation of gateway to backend protocol @@ -227,6 +304,8 @@ def __init__(self, settings, **kwargs): else: self.data_event_id = None + self.msg_manager = MessageManager(settings.cache_time_window, settings.cache_update_s) + def _on_mqtt_wrapper_termination_cb(self): """ Callback used to be informed when the MQTT wrapper has exited @@ -384,7 +463,7 @@ def _send_asynchronous_set_config_response(self, name): sink.read_config(), ) topic = TopicGenerator.make_set_config_response_topic(self.gw_id, sink.sink_id) - self.mqtt_wrapper.publish(topic, response.payload, qos=2) + self.mqtt_wrapper.publish(topic, response.payload, qos=1) def _send_asynchronous_get_configs_response(self): # Create a list of different sink configs @@ -401,7 +480,7 @@ def _send_asynchronous_get_configs_response(self): ) topic = TopicGenerator.make_get_configs_response_topic(self.gw_id) - self.mqtt_wrapper.publish(topic, response.payload, qos=2) + self.mqtt_wrapper.publish(topic, response.payload, qos=1) def deferred_thread(fn): """ @@ -451,6 +530,9 @@ def _on_send_data_cmd_received(self, client, userdata, message): logging.error(str(e)) return + if not self.msg_manager.add_msg(request.req_id): + return + # Get the sink-id from topic _, sink_id = TopicParser.parse_send_data_topic(message.topic) @@ -480,7 +562,7 @@ def _on_send_data_cmd_received(self, client, userdata, message): response = wmm.SendDataResponse(request.req_id, self.gw_id, res, sink_id) topic = TopicGenerator.make_send_data_response_topic(self.gw_id, sink_id) - self.mqtt_wrapper.publish(topic, response.payload, qos=2) + self.mqtt_wrapper.publish(topic, response.payload, qos=1) @deferred_thread def _on_get_configs_cmd_received(self, client, userdata, message): @@ -492,6 +574,9 @@ def _on_get_configs_cmd_received(self, client, userdata, message): logging.error(str(e)) return + if not self.msg_manager.add_msg(request.req_id): + return + # Create a list of different sink configs configs = [] for sink in self.sink_manager.get_sinks(): @@ -504,7 +589,7 @@ def _on_get_configs_cmd_received(self, client, userdata, message): ) topic = TopicGenerator.make_get_configs_response_topic(self.gw_id) - self.mqtt_wrapper.publish(topic, response.payload, qos=2) + self.mqtt_wrapper.publish(topic, response.payload, qos=1) @deferred_thread def _on_own_status_received(self, client, userdata, message): @@ -536,6 +621,9 @@ def _on_get_gateway_info_cmd_received(self, client, userdata, message): logging.error(str(e)) return + if not self.msg_manager.add_msg(request.req_id): + return + response = wmm.GetGatewayInfoResponse( request.req_id, self.gw_id, @@ -547,7 +635,7 @@ def _on_get_gateway_info_cmd_received(self, client, userdata, message): ) topic = TopicGenerator.make_get_gateway_info_response_topic(self.gw_id) - self.mqtt_wrapper.publish(topic, response.payload, qos=2) + self.mqtt_wrapper.publish(topic, response.payload, qos=1) @deferred_thread def _on_set_config_cmd_received(self, client, userdata, message): @@ -575,7 +663,7 @@ def _on_set_config_cmd_received(self, client, userdata, message): self.gw_id, request.sink_id ) - self.mqtt_wrapper.publish(topic, response.payload, qos=2) + self.mqtt_wrapper.publish(topic, response.payload, qos=1) @deferred_thread def _on_otap_status_request_received(self, client, userdata, message): @@ -587,6 +675,9 @@ def _on_otap_status_request_received(self, client, userdata, message): logging.error(str(e)) return + if not self.msg_manager.add_msg(request.req_id): + return + sink = self.sink_manager.get_sink(request.sink_id) if sink is not None: d = sink.get_scratchpad_status() @@ -625,7 +716,7 @@ def _on_otap_status_request_received(self, client, userdata, message): self.gw_id, request.sink_id ) - self.mqtt_wrapper.publish(topic, response.payload, qos=2) + self.mqtt_wrapper.publish(topic, response.payload, qos=1) @deferred_thread def _on_otap_upload_scratchpad_request_received(self, client, userdata, message): @@ -639,6 +730,9 @@ def _on_otap_upload_scratchpad_request_received(self, client, userdata, message) logging.info("OTAP upload request received for %s", request.sink_id) + if not self.msg_manager.add_msg(request.req_id): + return + sink = self.sink_manager.get_sink(request.sink_id) if sink is not None: res = sink.upload_scratchpad(request.seq, request.scratchpad) @@ -653,7 +747,7 @@ def _on_otap_upload_scratchpad_request_received(self, client, userdata, message) self.gw_id, request.sink_id ) - self.mqtt_wrapper.publish(topic, response.payload, qos=2) + self.mqtt_wrapper.publish(topic, response.payload, qos=1) @deferred_thread def _on_otap_process_scratchpad_request_received(self, client, userdata, message): @@ -665,6 +759,9 @@ def _on_otap_process_scratchpad_request_received(self, client, userdata, message logging.error(str(e)) return + if not self.msg_manager.add_msg(request.req_id): + return + sink = self.sink_manager.get_sink(request.sink_id) if sink is not None: res = sink.process_scratchpad() @@ -679,7 +776,7 @@ def _on_otap_process_scratchpad_request_received(self, client, userdata, message self.gw_id, request.sink_id ) - self.mqtt_wrapper.publish(topic, response.payload, qos=2) + self.mqtt_wrapper.publish(topic, response.payload, qos=1) @deferred_thread def _on_otap_set_target_scratchpad_request_received( @@ -700,6 +797,9 @@ def _on_otap_set_target_scratchpad_request_received( logging.error("Action is mandatory") res = wmm.GatewayResultCode.GW_RES_INVALID_PARAM + if not self.msg_manager.add_msg(request.req_id): + return + if res == wmm.GatewayResultCode.GW_RES_OK: # Get optional params (None if not present) seq = request.target.get("target_sequence") @@ -744,7 +844,7 @@ def _on_otap_set_target_scratchpad_request_received( self.gw_id, request.sink_id ) - self.mqtt_wrapper.publish(topic, response.payload, qos=2) + self.mqtt_wrapper.publish(topic, response.payload, qos=1) def parse_setting_list(list_setting): @@ -900,6 +1000,7 @@ def main(): parse.add_gateway_config() parse.add_filtering_config() parse.add_buffering_settings() + parse.add_cache_settings() parse.add_debug_settings() parse.add_deprecated_args() diff --git a/python_transport/wirepas_gateway/utils/argument_tools.py b/python_transport/wirepas_gateway/utils/argument_tools.py index 2f45b1ec..0c483767 100644 --- a/python_transport/wirepas_gateway/utils/argument_tools.py +++ b/python_transport/wirepas_gateway/utils/argument_tools.py @@ -402,6 +402,32 @@ def add_buffering_settings(self): ), ) + def add_cache_settings(self): + """ Parameters used to avoid sending redundant messages in the mqtt broker """ + cleaning_time_window_default = 20*60 # 20 minutes + cache_update_s_default = 20 # 20 secondes + + self.cache.add_argument( + "--cache_time_window", + default=os.environ.get("WM_CACHE_TIME_WINDOW", cleaning_time_window_default), + action="store", + type=self.str2int, + help=( + "Time in seconds after which a message is clean from the cache." + ), + ) + + self.cache.add_argument( + "--cache_update_s", + default=os.environ.get("WM_CACHE_UPDATE_S", cache_update_s_default), + action="store", + type=self.str2int, + help=( + "Period to update the list of received messages in seconds." + "Must be smaller than time_window" + ), + ) + def add_debug_settings(self): self.debug.add_argument( "--debug_incr_data_event_id", From 7284e09b85fe024be5ff1181bf72444e7bebfccd Mon Sep 17 00:00:00 2001 From: LePailleurThibault Date: Fri, 4 Nov 2022 15:10:57 +0100 Subject: [PATCH 2/3] MQTT compatibility with QoS Some MQTT brokers do not support qos=2. Changing transport service to publish with qos=1. Adding a cache in transport service to avoid sending redundant messages. --- python_transport/tests/test_arguments.py | 3 + python_transport/tests/test_cache_message.py | 55 +++++++++++++ .../wirepas_gateway/protocol/cache_message.py | 76 ++++++++++++++++++ .../wirepas_gateway/transport_service.py | 80 +------------------ 4 files changed, 136 insertions(+), 78 deletions(-) create mode 100644 python_transport/tests/test_cache_message.py create mode 100644 python_transport/wirepas_gateway/protocol/cache_message.py diff --git a/python_transport/tests/test_arguments.py b/python_transport/tests/test_arguments.py index 4edb9dad..2b13622a 100644 --- a/python_transport/tests/test_arguments.py +++ b/python_transport/tests/test_arguments.py @@ -107,6 +107,7 @@ def get_args(delete_bools=False, set_env=True, set_file=False): parse.add_gateway_config() parse.add_filtering_config() parse.add_buffering_settings() + parse.add_cache_settings() settings = parse.settings() @@ -196,6 +197,7 @@ def test_defaults(): parse.add_gateway_config() parse.add_filtering_config() parse.add_buffering_settings() + parse.add_cache_settings() sys.argv = [sys.argv[0]] settings = parse.settings() @@ -214,6 +216,7 @@ def test_defaults(): assert settings.mqtt_reconnect_delay == 0 assert settings.buffering_max_buffered_packets == 0 assert settings.buffering_max_delay_without_publish == 0 + assert settings.buffering_minimal_sink_cost == 0 assert settings.cache_update_s == 20 assert settings.cache_time_window == 1200 assert settings.gateway_id is None diff --git a/python_transport/tests/test_cache_message.py b/python_transport/tests/test_cache_message.py new file mode 100644 index 00000000..72377201 --- /dev/null +++ b/python_transport/tests/test_cache_message.py @@ -0,0 +1,55 @@ +from time import sleep +from wirepas_gateway.protocol.cache_message import CacheMessage + +CACHE_TIME_WINDOW = 0.1 +CACHE_UPDATE_S = 0.025 +REQ_ID = 160 +REQ_ID2 = 161 + + +def test_adding_msg(): + """ + Tests the adding of message in the cache. + """ + cache = CacheMessage(CACHE_TIME_WINDOW, CACHE_UPDATE_S) + assert cache.get_size() == 0 + + assert cache.add_msg(REQ_ID) is True + assert cache.get_size() == 1 + + assert cache.add_msg(REQ_ID2) is True + assert cache.get_size() == 2 + + # Test the mapping in the cache + assert cache.is_in_cache(REQ_ID) + assert cache.is_in_cache(REQ_ID2) + +def test_duplicate(): + """ + Tests if the duplicate messages are not stored + """ + cache = CacheMessage(CACHE_TIME_WINDOW, CACHE_UPDATE_S) + assert cache.get_size() == 0 + + assert cache.add_msg(REQ_ID) is True + assert cache.get_size() == 1 + time1 = cache.msg_list[REQ_ID] + + # Test if a redundant message is dropped + assert cache.add_msg(REQ_ID) is False + assert cache.get_size() == 1 + + # Test if the timestamp of the message has been updated + time2 = cache.msg_list[REQ_ID] + assert time1 != time2 + +def test_cache_time_window(): + """ + Tests the well functionning of cache time window. + """ + cache = CacheMessage(CACHE_TIME_WINDOW, CACHE_UPDATE_S) + assert cache.add_msg(REQ_ID) is True + + # Test if the cache has reset eventually + sleep(CACHE_TIME_WINDOW) + assert cache.add_msg(REQ_ID) is True diff --git a/python_transport/wirepas_gateway/protocol/cache_message.py b/python_transport/wirepas_gateway/protocol/cache_message.py new file mode 100644 index 00000000..cdccba41 --- /dev/null +++ b/python_transport/wirepas_gateway/protocol/cache_message.py @@ -0,0 +1,76 @@ +# Copyright 2019 Wirepas Ltd licensed under Apache License, Version 2.0 +# +# See file LICENSE for full license details. +# +from time import time +from threading import Lock + + +class CacheMessage: + """ + Class to cache messages to avoid sending duplicates in MQTT with QoS=1. + """ + def __init__( + self, + cache_time_window, + cache_update_s + ): + """ + Args: + cache_time_window: time in seconds after which a message is clean from the cache. + cache_update_s: period in seconds to update the list of received messages. + Must be shorter than cache_time_window + """ + self._lock = Lock() + self.msg_list = dict() # Dictionary of received messages id mapped to their timestamps + + self.cache_time_window = cache_time_window + self.cache_update_s = min(cache_update_s, cache_time_window) + + self.last_update_time = 0 # last time an update of the list of received messages was done + + def add_msg(self, msg_id): + """ + Adds a received message to the cache. + If the message is a duplicate, it only updates its last timestamp to the current time. + + Args: + msg_id: the id of a message to be added to the cache. + """ + current_time = time() + with self._lock: + if current_time - self.last_update_time > self.cache_update_s: + self._clean_msg_list() + + is_duplicate = self.is_in_cache(msg_id) + self.msg_list[msg_id] = current_time + return not is_duplicate + + def get_size(self): + """ + Returns the number of messages in the cache. + """ + with self._lock: + return len(self.msg_list) + + def _clean_msg_list(self): + """ + Clean the old messages in the cache + based on the time window cache_time_window attribute. + """ + current_time = time() + self.msg_list = { + msg_id: msg_time for (msg_id, msg_time) in self.msg_list.items() + if current_time - msg_time <= self.cache_time_window + } + self.last_update_time = current_time + + def is_in_cache(self, msg_id): + """ + Returns True if the id of a message is already in the cache. + Return False otherwise. + + Args: + msg_id: id of the message received + """ + return msg_id in self.msg_list diff --git a/python_transport/wirepas_gateway/transport_service.py b/python_transport/wirepas_gateway/transport_service.py index c65a606a..0b16765e 100644 --- a/python_transport/wirepas_gateway/transport_service.py +++ b/python_transport/wirepas_gateway/transport_service.py @@ -13,6 +13,7 @@ from wirepas_gateway.dbus.dbus_client import BusClient from wirepas_gateway.protocol.topic_helper import TopicGenerator, TopicParser from wirepas_gateway.protocol.mqtt_wrapper import MQTTWrapper +from wirepas_gateway.protocol.cache_message import CacheMessage from wirepas_gateway.utils import ParserHelper from wirepas_gateway import __version__ as transport_version @@ -156,83 +157,6 @@ def initialize_sink(self, name): sink.cost = self.minimum_sink_cost -class MessageManager: - def __init__( - self, - time_window, - cache_update_s - ): - """ - Manage and cache messages to avoid sending duplicate in MQTT with QoS=1. - - Args: - time_window: time in seconds after which a message is clean from the cache. - cache_update_s: period in seconds to update the list of received messages. - Must be shorter than time_window - """ - self._lock = Lock() - self.msg_list = dict() # Dictionary of received messages id mapped to their timestamps - - self.time_window = time_window - self.cache_update_s = min(cache_update_s, time_window) - - self.latest_timestamp = 0 # latest timestamp of a received message - self.last_update_time = 0 # last time an update of the list of received messages was done - - def add_msg(self, msg_id): - """ - Adds a received message to the cache. - If the message is a duplicate, it only updates its last timestamp to the current time. - - Args: - msg_id: the id of a message to be added to the cache. - """ - time_s_epoch = time() - with self._lock: - self._update_latest_timestamp(time_s_epoch) - - if self.latest_timestamp - self.last_update_time > self.cache_update_s: - self._update_msg_list() - - is_duplicate = self._is_duplicate(msg_id) - self.msg_list[msg_id] = time_s_epoch - return not is_duplicate - - def get_size(self): - """ - Returns the number of messages in the cache. - """ - with self._lock: - return len(self.msg_list) - - def _update_msg_list(self): - """ - Clean the old messages in the cache - based on cleaning time period time_window attribute. - """ - self.msg_list = { - msg_id: msg_time for (msg_id, msg_time) in self.msg_list.items() - if self.latest_timestamp - msg_time <= self.time_window - } - self.last_update_time = self.latest_timestamp - - def _update_latest_timestamp(self, timestamp): - """ - Updates the latest timestamp of a received message. - """ - self.latest_timestamp = max(self.latest_timestamp, timestamp) - - def _is_duplicate(self, msg_id): - """ - Returns True if the id of a message is already in the cache. - Return False otherwise. - - Args: - msg_id: id of the message received - """ - return msg_id in self.msg_list - - class TransportService(BusClient): """ Implementation of gateway to backend protocol @@ -304,7 +228,7 @@ def __init__(self, settings, **kwargs): else: self.data_event_id = None - self.msg_manager = MessageManager(settings.cache_time_window, settings.cache_update_s) + self.msg_manager = CacheMessage(settings.cache_time_window, settings.cache_update_s) def _on_mqtt_wrapper_termination_cb(self): """ From 307c3a221173beb18ba93d656c4cd203b8ebf388 Mon Sep 17 00:00:00 2001 From: LePailleurThibault Date: Mon, 2 Jan 2023 17:32:04 +0100 Subject: [PATCH 3/3] Fixes on PR #238 Renaming class to MessageCache + normalizing variable names for better visibility. Cache is being cleaned with a thread and not with messages addition. QoS has been removed from mqtt option on publish and subscription, as only qos=1 should be used with the cache. Warnings have been added when redundancy is dectected at transport service level. --- python_transport/tests/test_arguments.py | 10 +-- python_transport/tests/test_cache_message.py | 50 ++++++----- .../wirepas_gateway/protocol/cache_message.py | 76 ---------------- .../wirepas_gateway/protocol/message_cache.py | 87 +++++++++++++++++++ .../wirepas_gateway/protocol/mqtt_wrapper.py | 25 +++--- .../wirepas_gateway/transport_service.py | 76 +++++++++------- .../wirepas_gateway/utils/argument_tools.py | 34 ++++---- 7 files changed, 197 insertions(+), 161 deletions(-) delete mode 100644 python_transport/wirepas_gateway/protocol/cache_message.py create mode 100644 python_transport/wirepas_gateway/protocol/message_cache.py diff --git a/python_transport/tests/test_arguments.py b/python_transport/tests/test_arguments.py index 2b13622a..e3d0c672 100644 --- a/python_transport/tests/test_arguments.py +++ b/python_transport/tests/test_arguments.py @@ -22,7 +22,7 @@ env_vars["WM_GW_BUFFERING_MAX_BUFFERED_PACKETS"] = 1000 env_vars["WM_GW_BUFFERING_MAX_DELAY_WITHOUT_PUBLISH"] = 128 env_vars["WM_GW_BUFFERING_MINIMAL_SINK_COST"] = 240 -env_vars["WM_CACHE_TIME_WINDOW"] = 1800 +env_vars["WM_CACHE_TIME_WINDOW_S"] = 1800 env_vars["WM_CACHE_UPDATE_S"] = 60 env_vars["WM_GW_ID"] = "1" env_vars["WM_GW_MODEL"] = "test" @@ -56,7 +56,7 @@ "WM_GW_BUFFERING_MAX_DELAY_WITHOUT_PUBLISH" ] file_vars["buffering_minimal_sink_cost"] = env_vars["WM_GW_BUFFERING_MINIMAL_SINK_COST"] -file_vars["cache_time_window"] = env_vars["WM_CACHE_TIME_WINDOW"] +file_vars["cache_time_window_s"] = env_vars["WM_CACHE_TIME_WINDOW_S"] file_vars["cache_update_s"] = env_vars["WM_CACHE_UPDATE_S"] file_vars["gateway_id"] = env_vars["WM_GW_ID"] @@ -159,8 +159,8 @@ def content_tests(settings, vcopy): == settings.buffering_minimal_sink_cost ) assert ( - vcopy["WM_CACHE_TIME_WINDOW"] - == settings.cache_time_window + vcopy["WM_CACHE_TIME_WINDOW_S"] + == settings.cache_time_window_s ) assert ( vcopy["WM_CACHE_UPDATE_S"] @@ -218,7 +218,7 @@ def test_defaults(): assert settings.buffering_max_delay_without_publish == 0 assert settings.buffering_minimal_sink_cost == 0 assert settings.cache_update_s == 20 - assert settings.cache_time_window == 1200 + assert settings.cache_time_window_s == 1200 assert settings.gateway_id is None assert settings.gateway_model is None assert settings.gateway_version is None diff --git a/python_transport/tests/test_cache_message.py b/python_transport/tests/test_cache_message.py index 72377201..37532a4e 100644 --- a/python_transport/tests/test_cache_message.py +++ b/python_transport/tests/test_cache_message.py @@ -1,7 +1,7 @@ from time import sleep -from wirepas_gateway.protocol.cache_message import CacheMessage +from wirepas_gateway.protocol.message_cache import MessageCache -CACHE_TIME_WINDOW = 0.1 +CACHE_TIME_WINDOW_S = 0.1 CACHE_UPDATE_S = 0.025 REQ_ID = 160 REQ_ID2 = 161 @@ -11,45 +11,47 @@ def test_adding_msg(): """ Tests the adding of message in the cache. """ - cache = CacheMessage(CACHE_TIME_WINDOW, CACHE_UPDATE_S) - assert cache.get_size() == 0 + message_cache = MessageCache(CACHE_TIME_WINDOW_S, CACHE_UPDATE_S) + assert message_cache.get_size() == 0 - assert cache.add_msg(REQ_ID) is True - assert cache.get_size() == 1 + assert message_cache.add_msg(REQ_ID) is True + assert message_cache.get_size() == 1 - assert cache.add_msg(REQ_ID2) is True - assert cache.get_size() == 2 + assert message_cache.add_msg(REQ_ID2) is True + assert message_cache.get_size() == 2 # Test the mapping in the cache - assert cache.is_in_cache(REQ_ID) - assert cache.is_in_cache(REQ_ID2) + assert message_cache.is_in_cache(REQ_ID) + assert message_cache.is_in_cache(REQ_ID2) + def test_duplicate(): """ Tests if the duplicate messages are not stored """ - cache = CacheMessage(CACHE_TIME_WINDOW, CACHE_UPDATE_S) - assert cache.get_size() == 0 + message_cache = MessageCache(CACHE_TIME_WINDOW_S, CACHE_UPDATE_S) + assert message_cache.get_size() == 0 - assert cache.add_msg(REQ_ID) is True - assert cache.get_size() == 1 - time1 = cache.msg_list[REQ_ID] + assert message_cache.add_msg(REQ_ID) is True + assert message_cache.get_size() == 1 # Test if a redundant message is dropped - assert cache.add_msg(REQ_ID) is False - assert cache.get_size() == 1 + assert message_cache.add_msg(REQ_ID) is False + assert message_cache.get_size() == 1 - # Test if the timestamp of the message has been updated - time2 = cache.msg_list[REQ_ID] - assert time1 != time2 def test_cache_time_window(): """ Tests the well functionning of cache time window. """ - cache = CacheMessage(CACHE_TIME_WINDOW, CACHE_UPDATE_S) - assert cache.add_msg(REQ_ID) is True + message_cache = MessageCache(CACHE_TIME_WINDOW_S, CACHE_UPDATE_S) + assert message_cache.add_msg(REQ_ID) is True # Test if the cache has reset eventually - sleep(CACHE_TIME_WINDOW) - assert cache.add_msg(REQ_ID) is True + sleep(CACHE_TIME_WINDOW_S/2) + assert message_cache.is_in_cache(REQ_ID) is True + sleep(CACHE_TIME_WINDOW_S) + assert message_cache.is_in_cache(REQ_ID) is False + + # And that a message with the same id can be added after the time window + assert message_cache.add_msg(REQ_ID) is True diff --git a/python_transport/wirepas_gateway/protocol/cache_message.py b/python_transport/wirepas_gateway/protocol/cache_message.py deleted file mode 100644 index cdccba41..00000000 --- a/python_transport/wirepas_gateway/protocol/cache_message.py +++ /dev/null @@ -1,76 +0,0 @@ -# Copyright 2019 Wirepas Ltd licensed under Apache License, Version 2.0 -# -# See file LICENSE for full license details. -# -from time import time -from threading import Lock - - -class CacheMessage: - """ - Class to cache messages to avoid sending duplicates in MQTT with QoS=1. - """ - def __init__( - self, - cache_time_window, - cache_update_s - ): - """ - Args: - cache_time_window: time in seconds after which a message is clean from the cache. - cache_update_s: period in seconds to update the list of received messages. - Must be shorter than cache_time_window - """ - self._lock = Lock() - self.msg_list = dict() # Dictionary of received messages id mapped to their timestamps - - self.cache_time_window = cache_time_window - self.cache_update_s = min(cache_update_s, cache_time_window) - - self.last_update_time = 0 # last time an update of the list of received messages was done - - def add_msg(self, msg_id): - """ - Adds a received message to the cache. - If the message is a duplicate, it only updates its last timestamp to the current time. - - Args: - msg_id: the id of a message to be added to the cache. - """ - current_time = time() - with self._lock: - if current_time - self.last_update_time > self.cache_update_s: - self._clean_msg_list() - - is_duplicate = self.is_in_cache(msg_id) - self.msg_list[msg_id] = current_time - return not is_duplicate - - def get_size(self): - """ - Returns the number of messages in the cache. - """ - with self._lock: - return len(self.msg_list) - - def _clean_msg_list(self): - """ - Clean the old messages in the cache - based on the time window cache_time_window attribute. - """ - current_time = time() - self.msg_list = { - msg_id: msg_time for (msg_id, msg_time) in self.msg_list.items() - if current_time - msg_time <= self.cache_time_window - } - self.last_update_time = current_time - - def is_in_cache(self, msg_id): - """ - Returns True if the id of a message is already in the cache. - Return False otherwise. - - Args: - msg_id: id of the message received - """ - return msg_id in self.msg_list diff --git a/python_transport/wirepas_gateway/protocol/message_cache.py b/python_transport/wirepas_gateway/protocol/message_cache.py new file mode 100644 index 00000000..8d47b4f4 --- /dev/null +++ b/python_transport/wirepas_gateway/protocol/message_cache.py @@ -0,0 +1,87 @@ +# Copyright 2022 Wirepas Ltd licensed under Apache License, Version 2.0 +# +# See file LICENSE for full license details. +# +from time import time, sleep +from threading import Lock, Thread + + +class MessageCache: + """ + Class to cache all recent messages to avoid sending duplicates in MQTT. + """ + def __init__( + self, + cache_time_window_s, + cache_update_s + ): + """ + Args: + cache_time_window_s: time in seconds after which a message is removed from the cache. + cache_update_s: period in seconds to update the list of received messages. + Must be shorter than cache_time_window_s + """ + self._lock = Lock() + # Dictionary of received messages id mapped to their timestamps + self._cache = dict() + # last time an update of the list of received messages was done + self._last_update_time = 0 + + self.cache_time_window_s = cache_time_window_s + self.cache_update_s = min(cache_update_s, cache_time_window_s) + + # Start a thread that cleans periodically the cache. + self._clean_cache_thread = Thread(target=self._clean_cache_thread, daemon=True) + self._clean_cache_thread.start() + + def add_msg(self, msg_id): + """ + Adds a received message to the cache. If the message is a duplicate, + it only updates its last timestamp to the current time. + + Args: + msg_id: the id of a message to be added to the cache. + """ + current_time = time() + is_duplicate = self.is_in_cache(msg_id) + with self._lock: + self._cache[msg_id] = current_time + return not is_duplicate + + def get_size(self): + """ + Returns the number of messages in the cache. + """ + with self._lock: + return len(self._cache) + + def _clean_cache(self): + """ + Clean the old messages in the cache + based on the time window cache_time_window_s attribute. + """ + with self._lock: + current_time = time() + self._cache = { + msg_id: msg_time for (msg_id, msg_time) in self._cache.items() + if current_time - msg_time <= self.cache_time_window_s + } + self._last_update_time = current_time + + def _clean_cache_thread(self): + """ + Clean the cache and sleep cache_update_s seconds in loop. + """ + while True: + self._clean_cache() + sleep(self.cache_update_s) + + def is_in_cache(self, msg_id): + """ + Return True if the id of a message is already in the cache, False otherwise. + + Args: + msg_id: id of the message + """ + with self._lock: + return msg_id in self._cache diff --git a/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py b/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py index 9859c9f3..b9b877ab 100644 --- a/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py +++ b/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py @@ -162,10 +162,10 @@ def _do_select(self, sock): # Publish everything. Loop is not necessary as # next select will exit immediately if queue not empty while True: - topic, payload, qos, retain = self._publish_queue.get() + topic, payload, retain = self._publish_queue.get() self._publish_from_wrapper_thread( - topic, payload, qos=qos, retain=retain + topic, payload, retain=retain ) # FIX: read internal sockpairR as it is written but @@ -267,37 +267,42 @@ def run(self): # thread has exited self.on_termination_cb() - def _publish_from_wrapper_thread(self, topic, payload, qos, retain): + def _publish_from_wrapper_thread(self, topic, payload, retain): """Internal method to publish on Mqtt. This method is only called from mqtt wrapper thread to avoid races. Args: topic: Topic to publish on payload: Payload - qos: Qos to use retain: Is it a retain message """ - mid = self._client.publish(topic, payload, qos=qos, retain=retain).mid + mid = self._client.publish(topic, payload, qos=1, retain=retain).mid self._unpublished_mid_set.add(mid) - def publish(self, topic, payload, qos=1, retain=False) -> None: + def publish(self, topic, payload, retain=False) -> None: """ Method to publish to Mqtt from any thread Args: topic: Topic to publish on payload: Payload - qos: Qos to use retain: Is it a retain message """ # Send it to the queue to be published from Mqtt thread - self._publish_queue.put((topic, payload, qos, retain)) + self._publish_queue.put((topic, payload, retain)) self._publish_monitor.on_publish_request() - def subscribe(self, topic, cb, qos=2) -> None: + def subscribe(self, topic, cb) -> None: + """ Method to subscribe to mqtt topic + + Args: + topic: Topic to subscribe to + cb: Callback to call on message reception + + """ logging.debug("Subscribing to: {}".format(topic)) - self._client.subscribe(topic, qos) + self._client.subscribe(topic, qos=1) self._client.message_callback_add(topic, cb) @property diff --git a/python_transport/wirepas_gateway/transport_service.py b/python_transport/wirepas_gateway/transport_service.py index 0b16765e..452dba91 100644 --- a/python_transport/wirepas_gateway/transport_service.py +++ b/python_transport/wirepas_gateway/transport_service.py @@ -8,12 +8,12 @@ import wirepas_mesh_messaging as wmm from time import time, sleep from uuid import getnode -from threading import Thread, Lock +from threading import Thread from wirepas_gateway.dbus.dbus_client import BusClient from wirepas_gateway.protocol.topic_helper import TopicGenerator, TopicParser from wirepas_gateway.protocol.mqtt_wrapper import MQTTWrapper -from wirepas_gateway.protocol.cache_message import CacheMessage +from wirepas_gateway.protocol.message_cache import MessageCache from wirepas_gateway.utils import ParserHelper from wirepas_gateway import __version__ as transport_version @@ -228,7 +228,10 @@ def __init__(self, settings, **kwargs): else: self.data_event_id = None - self.msg_manager = CacheMessage(settings.cache_time_window, settings.cache_update_s) + self.message_cache = MessageCache( + settings.cache_time_window_s, + settings.cache_update_s + ) def _on_mqtt_wrapper_termination_cb(self): """ @@ -244,7 +247,7 @@ def _set_status(self): topic = TopicGenerator.make_status_topic(self.gw_id) - self.mqtt_wrapper.publish(topic, event_online.payload, qos=1, retain=True) + self.mqtt_wrapper.publish(topic, event_online.payload, retain=True) def _on_connect(self): # Register for get gateway info @@ -262,10 +265,7 @@ def _on_connect(self): # Register for send data request for any sink on the gateway topic = TopicGenerator.make_send_data_request_topic(self.gw_id) logging.debug("Subscribing to: %s", topic) - # It is important to have a qos of 2 and also from the publisher as 1 could generate - # duplicated packets and we don't know the consequences on end - # application - self.mqtt_wrapper.subscribe(topic, self._on_send_data_cmd_received, qos=2) + self.mqtt_wrapper.subscribe(topic, self._on_send_data_cmd_received) # Register for otap commands for any sink on the gateway topic = TopicGenerator.make_otap_status_request_topic(self.gw_id) @@ -359,10 +359,7 @@ def on_data_received( if self.data_event_id is not None: self.data_event_id += 1 - # Set qos to 1 to avoid loading too much the broker - # unique id in event header can be used for duplicate filtering in - # backends - self.mqtt_wrapper.publish(topic, event.payload, qos=1) + self.mqtt_wrapper.publish(topic, event.payload) def on_stack_started(self, name): logging.debug("Sink started: %s", name) @@ -387,7 +384,7 @@ def _send_asynchronous_set_config_response(self, name): sink.read_config(), ) topic = TopicGenerator.make_set_config_response_topic(self.gw_id, sink.sink_id) - self.mqtt_wrapper.publish(topic, response.payload, qos=1) + self.mqtt_wrapper.publish(topic, response.payload) def _send_asynchronous_get_configs_response(self): # Create a list of different sink configs @@ -404,7 +401,7 @@ def _send_asynchronous_get_configs_response(self): ) topic = TopicGenerator.make_get_configs_response_topic(self.gw_id) - self.mqtt_wrapper.publish(topic, response.payload, qos=1) + self.mqtt_wrapper.publish(topic, response.payload) def deferred_thread(fn): """ @@ -454,7 +451,10 @@ def _on_send_data_cmd_received(self, client, userdata, message): logging.error(str(e)) return - if not self.msg_manager.add_msg(request.req_id): + # Add the message to the cache, if it can't be added, + # the function should not send back any response to the request + if not self.message_cache.add_msg(request.req_id): + logging.warning(f"_on_send_data_cmd_received: A message with id={request.req_id} is already present in the cache !") return # Get the sink-id from topic @@ -486,7 +486,7 @@ def _on_send_data_cmd_received(self, client, userdata, message): response = wmm.SendDataResponse(request.req_id, self.gw_id, res, sink_id) topic = TopicGenerator.make_send_data_response_topic(self.gw_id, sink_id) - self.mqtt_wrapper.publish(topic, response.payload, qos=1) + self.mqtt_wrapper.publish(topic, response.payload) @deferred_thread def _on_get_configs_cmd_received(self, client, userdata, message): @@ -498,7 +498,10 @@ def _on_get_configs_cmd_received(self, client, userdata, message): logging.error(str(e)) return - if not self.msg_manager.add_msg(request.req_id): + # Add the message to the cache, if it can't be added, + # the function should not send back any response to the request + if not self.message_cache.add_msg(request.req_id): + logging.warning(f"_on_get_configs_cmd_received: A message with id={request.req_id} is already present in the cache !") return # Create a list of different sink configs @@ -513,7 +516,7 @@ def _on_get_configs_cmd_received(self, client, userdata, message): ) topic = TopicGenerator.make_get_configs_response_topic(self.gw_id) - self.mqtt_wrapper.publish(topic, response.payload, qos=1) + self.mqtt_wrapper.publish(topic, response.payload) @deferred_thread def _on_own_status_received(self, client, userdata, message): @@ -545,7 +548,8 @@ def _on_get_gateway_info_cmd_received(self, client, userdata, message): logging.error(str(e)) return - if not self.msg_manager.add_msg(request.req_id): + if not self.message_cache.add_msg(request.req_id): + logging.warning(f"_on_get_gateway_info_cmd_received: A message with id={request.req_id} is already present in the cache !") return response = wmm.GetGatewayInfoResponse( @@ -559,7 +563,7 @@ def _on_get_gateway_info_cmd_received(self, client, userdata, message): ) topic = TopicGenerator.make_get_gateway_info_response_topic(self.gw_id) - self.mqtt_wrapper.publish(topic, response.payload, qos=1) + self.mqtt_wrapper.publish(topic, response.payload) @deferred_thread def _on_set_config_cmd_received(self, client, userdata, message): @@ -571,6 +575,10 @@ def _on_set_config_cmd_received(self, client, userdata, message): logging.error(str(e)) return + if not self.message_cache.add_msg(request.req_id): + logging.warning(f"_on_set_config_cmd_received: A message with id={request.req_id} is already present in the cache !") + return + logging.debug("Set sink config: %s", request) sink = self.sink_manager.get_sink(request.sink_id) if sink is not None: @@ -587,7 +595,7 @@ def _on_set_config_cmd_received(self, client, userdata, message): self.gw_id, request.sink_id ) - self.mqtt_wrapper.publish(topic, response.payload, qos=1) + self.mqtt_wrapper.publish(topic, response.payload) @deferred_thread def _on_otap_status_request_received(self, client, userdata, message): @@ -599,7 +607,10 @@ def _on_otap_status_request_received(self, client, userdata, message): logging.error(str(e)) return - if not self.msg_manager.add_msg(request.req_id): + # Add the message to the cache, if it can't be added, + # the function should not send back any response to the request + if not self.message_cache.add_msg(request.req_id): + logging.warning(f"_on_otap_status_request_received: A message with id={request.req_id} is already present in the cache !") return sink = self.sink_manager.get_sink(request.sink_id) @@ -640,7 +651,7 @@ def _on_otap_status_request_received(self, client, userdata, message): self.gw_id, request.sink_id ) - self.mqtt_wrapper.publish(topic, response.payload, qos=1) + self.mqtt_wrapper.publish(topic, response.payload) @deferred_thread def _on_otap_upload_scratchpad_request_received(self, client, userdata, message): @@ -654,7 +665,10 @@ def _on_otap_upload_scratchpad_request_received(self, client, userdata, message) logging.info("OTAP upload request received for %s", request.sink_id) - if not self.msg_manager.add_msg(request.req_id): + # Add the message to the cache, if it can't be added, + # the function should not send back any mqtt message + if not self.message_cache.add_msg(request.req_id): + logging.warning(f"_on_otap_upload_scratchpad_request_received: A message with id={request.req_id} is already present in the cache !") return sink = self.sink_manager.get_sink(request.sink_id) @@ -671,7 +685,7 @@ def _on_otap_upload_scratchpad_request_received(self, client, userdata, message) self.gw_id, request.sink_id ) - self.mqtt_wrapper.publish(topic, response.payload, qos=1) + self.mqtt_wrapper.publish(topic, response.payload) @deferred_thread def _on_otap_process_scratchpad_request_received(self, client, userdata, message): @@ -683,7 +697,8 @@ def _on_otap_process_scratchpad_request_received(self, client, userdata, message logging.error(str(e)) return - if not self.msg_manager.add_msg(request.req_id): + if not self.message_cache.add_msg(request.req_id): + logging.warning(f"_on_otap_process_scratchpad_request_received: A message with id={request.req_id} is already present in the cache !") return sink = self.sink_manager.get_sink(request.sink_id) @@ -700,7 +715,7 @@ def _on_otap_process_scratchpad_request_received(self, client, userdata, message self.gw_id, request.sink_id ) - self.mqtt_wrapper.publish(topic, response.payload, qos=1) + self.mqtt_wrapper.publish(topic, response.payload) @deferred_thread def _on_otap_set_target_scratchpad_request_received( @@ -721,7 +736,10 @@ def _on_otap_set_target_scratchpad_request_received( logging.error("Action is mandatory") res = wmm.GatewayResultCode.GW_RES_INVALID_PARAM - if not self.msg_manager.add_msg(request.req_id): + # Add the message to the cache, if it can't be added, + # the function should not send back any message + if not self.message_cache.add_msg(request.req_id): + logging.warning(f"_on_otap_set_target_scratchpad_request_received: A message with id={request.req_id} is already present in the cache !") return if res == wmm.GatewayResultCode.GW_RES_OK: @@ -768,7 +786,7 @@ def _on_otap_set_target_scratchpad_request_received( self.gw_id, request.sink_id ) - self.mqtt_wrapper.publish(topic, response.payload, qos=1) + self.mqtt_wrapper.publish(topic, response.payload) def parse_setting_list(list_setting): diff --git a/python_transport/wirepas_gateway/utils/argument_tools.py b/python_transport/wirepas_gateway/utils/argument_tools.py index 0c483767..6b48bf1c 100644 --- a/python_transport/wirepas_gateway/utils/argument_tools.py +++ b/python_transport/wirepas_gateway/utils/argument_tools.py @@ -402,14 +402,28 @@ def add_buffering_settings(self): ), ) + def add_debug_settings(self): + self.debug.add_argument( + "--debug_incr_data_event_id", + default=os.environ.get("WM_SERVICES_DEBUG_INCR_EVENT_ID", False), + type=self.str2bool, + nargs="?", + const=True, + help=( + "When true the data received event id will be incremental " + "starting at 0 when service starts. Otherwise it will be " + "random 64 bits id." + ), + ) + def add_cache_settings(self): """ Parameters used to avoid sending redundant messages in the mqtt broker """ - cleaning_time_window_default = 20*60 # 20 minutes + cleaning_time_window_s_default = 1200 # 20 minutes cache_update_s_default = 20 # 20 secondes self.cache.add_argument( - "--cache_time_window", - default=os.environ.get("WM_CACHE_TIME_WINDOW", cleaning_time_window_default), + "--cache_time_window_s", + default=os.environ.get("WM_CACHE_TIME_WINDOW_S", cleaning_time_window_s_default), action="store", type=self.str2int, help=( @@ -428,20 +442,6 @@ def add_cache_settings(self): ), ) - def add_debug_settings(self): - self.debug.add_argument( - "--debug_incr_data_event_id", - default=os.environ.get("WM_SERVICES_DEBUG_INCR_EVENT_ID", False), - type=self.str2bool, - nargs="?", - const=True, - help=( - "When true the data received event id will be incremental " - "starting at 0 when service starts. Otherwise it will be " - "random 64 bits id." - ), - ) - @staticmethod def _deprecated_message(new_arg_name, deprecated_from="2.x"): """ Alerts the user that an argument will be deprecated within the