diff --git a/python_transport/tests/test_arguments.py b/python_transport/tests/test_arguments.py index 9d2ebc52..e3d0c672 100644 --- a/python_transport/tests/test_arguments.py +++ b/python_transport/tests/test_arguments.py @@ -22,6 +22,8 @@ env_vars["WM_GW_BUFFERING_MAX_BUFFERED_PACKETS"] = 1000 env_vars["WM_GW_BUFFERING_MAX_DELAY_WITHOUT_PUBLISH"] = 128 env_vars["WM_GW_BUFFERING_MINIMAL_SINK_COST"] = 240 +env_vars["WM_CACHE_TIME_WINDOW_S"] = 1800 +env_vars["WM_CACHE_UPDATE_S"] = 60 env_vars["WM_GW_ID"] = "1" env_vars["WM_GW_MODEL"] = "test" env_vars["WM_GW_VERSION"] = "pytest" @@ -54,6 +56,8 @@ "WM_GW_BUFFERING_MAX_DELAY_WITHOUT_PUBLISH" ] file_vars["buffering_minimal_sink_cost"] = env_vars["WM_GW_BUFFERING_MINIMAL_SINK_COST"] +file_vars["cache_time_window_s"] = env_vars["WM_CACHE_TIME_WINDOW_S"] +file_vars["cache_update_s"] = env_vars["WM_CACHE_UPDATE_S"] file_vars["gateway_id"] = env_vars["WM_GW_ID"] file_vars["gateway_model"] = env_vars["WM_GW_MODEL"] @@ -103,6 +107,7 @@ def get_args(delete_bools=False, set_env=True, set_file=False): parse.add_gateway_config() parse.add_filtering_config() parse.add_buffering_settings() + parse.add_cache_settings() settings = parse.settings() @@ -153,6 +158,14 @@ def content_tests(settings, vcopy): vcopy["WM_GW_BUFFERING_MINIMAL_SINK_COST"] == settings.buffering_minimal_sink_cost ) + assert ( + vcopy["WM_CACHE_TIME_WINDOW_S"] + == settings.cache_time_window_s + ) + assert ( + vcopy["WM_CACHE_UPDATE_S"] + == settings.cache_update_s + ) assert vcopy["WM_GW_ID"] == settings.gateway_id assert vcopy["WM_GW_MODEL"] == settings.gateway_model @@ -184,6 +197,7 @@ def test_defaults(): parse.add_gateway_config() parse.add_filtering_config() parse.add_buffering_settings() + parse.add_cache_settings() sys.argv = [sys.argv[0]] settings = parse.settings() @@ -203,6 +217,8 @@ def test_defaults(): assert settings.buffering_max_buffered_packets == 0 assert settings.buffering_max_delay_without_publish == 0 assert settings.buffering_minimal_sink_cost == 0 + assert settings.cache_update_s == 20 + assert settings.cache_time_window_s == 1200 assert settings.gateway_id is None assert settings.gateway_model is None assert settings.gateway_version is None diff --git a/python_transport/tests/test_cache_message.py b/python_transport/tests/test_cache_message.py new file mode 100644 index 00000000..37532a4e --- /dev/null +++ b/python_transport/tests/test_cache_message.py @@ -0,0 +1,57 @@ +from time import sleep +from wirepas_gateway.protocol.message_cache import MessageCache + +CACHE_TIME_WINDOW_S = 0.1 +CACHE_UPDATE_S = 0.025 +REQ_ID = 160 +REQ_ID2 = 161 + + +def test_adding_msg(): + """ + Tests the adding of message in the cache. + """ + message_cache = MessageCache(CACHE_TIME_WINDOW_S, CACHE_UPDATE_S) + assert message_cache.get_size() == 0 + + assert message_cache.add_msg(REQ_ID) is True + assert message_cache.get_size() == 1 + + assert message_cache.add_msg(REQ_ID2) is True + assert message_cache.get_size() == 2 + + # Test the mapping in the cache + assert message_cache.is_in_cache(REQ_ID) + assert message_cache.is_in_cache(REQ_ID2) + + +def test_duplicate(): + """ + Tests if the duplicate messages are not stored + """ + message_cache = MessageCache(CACHE_TIME_WINDOW_S, CACHE_UPDATE_S) + assert message_cache.get_size() == 0 + + assert message_cache.add_msg(REQ_ID) is True + assert message_cache.get_size() == 1 + + # Test if a redundant message is dropped + assert message_cache.add_msg(REQ_ID) is False + assert message_cache.get_size() == 1 + + +def test_cache_time_window(): + """ + Tests the well functionning of cache time window. + """ + message_cache = MessageCache(CACHE_TIME_WINDOW_S, CACHE_UPDATE_S) + assert message_cache.add_msg(REQ_ID) is True + + # Test if the cache has reset eventually + sleep(CACHE_TIME_WINDOW_S/2) + assert message_cache.is_in_cache(REQ_ID) is True + sleep(CACHE_TIME_WINDOW_S) + assert message_cache.is_in_cache(REQ_ID) is False + + # And that a message with the same id can be added after the time window + assert message_cache.add_msg(REQ_ID) is True diff --git a/python_transport/wirepas_gateway/protocol/message_cache.py b/python_transport/wirepas_gateway/protocol/message_cache.py new file mode 100644 index 00000000..8d47b4f4 --- /dev/null +++ b/python_transport/wirepas_gateway/protocol/message_cache.py @@ -0,0 +1,87 @@ +# Copyright 2022 Wirepas Ltd licensed under Apache License, Version 2.0 +# +# See file LICENSE for full license details. +# +from time import time, sleep +from threading import Lock, Thread + + +class MessageCache: + """ + Class to cache all recent messages to avoid sending duplicates in MQTT. + """ + def __init__( + self, + cache_time_window_s, + cache_update_s + ): + """ + Args: + cache_time_window_s: time in seconds after which a message is removed from the cache. + cache_update_s: period in seconds to update the list of received messages. + Must be shorter than cache_time_window_s + """ + self._lock = Lock() + # Dictionary of received messages id mapped to their timestamps + self._cache = dict() + # last time an update of the list of received messages was done + self._last_update_time = 0 + + self.cache_time_window_s = cache_time_window_s + self.cache_update_s = min(cache_update_s, cache_time_window_s) + + # Start a thread that cleans periodically the cache. + self._clean_cache_thread = Thread(target=self._clean_cache_thread, daemon=True) + self._clean_cache_thread.start() + + def add_msg(self, msg_id): + """ + Adds a received message to the cache. If the message is a duplicate, + it only updates its last timestamp to the current time. + + Args: + msg_id: the id of a message to be added to the cache. + """ + current_time = time() + is_duplicate = self.is_in_cache(msg_id) + with self._lock: + self._cache[msg_id] = current_time + return not is_duplicate + + def get_size(self): + """ + Returns the number of messages in the cache. + """ + with self._lock: + return len(self._cache) + + def _clean_cache(self): + """ + Clean the old messages in the cache + based on the time window cache_time_window_s attribute. + """ + with self._lock: + current_time = time() + self._cache = { + msg_id: msg_time for (msg_id, msg_time) in self._cache.items() + if current_time - msg_time <= self.cache_time_window_s + } + self._last_update_time = current_time + + def _clean_cache_thread(self): + """ + Clean the cache and sleep cache_update_s seconds in loop. + """ + while True: + self._clean_cache() + sleep(self.cache_update_s) + + def is_in_cache(self, msg_id): + """ + Return True if the id of a message is already in the cache, False otherwise. + + Args: + msg_id: id of the message + """ + with self._lock: + return msg_id in self._cache diff --git a/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py b/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py index cdf69756..b9b877ab 100644 --- a/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py +++ b/python_transport/wirepas_gateway/protocol/mqtt_wrapper.py @@ -162,10 +162,10 @@ def _do_select(self, sock): # Publish everything. Loop is not necessary as # next select will exit immediately if queue not empty while True: - topic, payload, qos, retain = self._publish_queue.get() + topic, payload, retain = self._publish_queue.get() self._publish_from_wrapper_thread( - topic, payload, qos=qos, retain=retain + topic, payload, retain=retain ) # FIX: read internal sockpairR as it is written but @@ -233,7 +233,7 @@ def _get_socket(self): def _set_last_will(self, topic, data): # Set Last wil message - self._client.will_set(topic, data, qos=2, retain=True) + self._client.will_set(topic, data, qos=1, retain=True) def run(self): self.running = True @@ -267,37 +267,42 @@ def run(self): # thread has exited self.on_termination_cb() - def _publish_from_wrapper_thread(self, topic, payload, qos, retain): + def _publish_from_wrapper_thread(self, topic, payload, retain): """Internal method to publish on Mqtt. This method is only called from mqtt wrapper thread to avoid races. Args: topic: Topic to publish on payload: Payload - qos: Qos to use retain: Is it a retain message """ - mid = self._client.publish(topic, payload, qos=qos, retain=retain).mid + mid = self._client.publish(topic, payload, qos=1, retain=retain).mid self._unpublished_mid_set.add(mid) - def publish(self, topic, payload, qos=1, retain=False) -> None: + def publish(self, topic, payload, retain=False) -> None: """ Method to publish to Mqtt from any thread Args: topic: Topic to publish on payload: Payload - qos: Qos to use retain: Is it a retain message """ # Send it to the queue to be published from Mqtt thread - self._publish_queue.put((topic, payload, qos, retain)) + self._publish_queue.put((topic, payload, retain)) self._publish_monitor.on_publish_request() - def subscribe(self, topic, cb, qos=2) -> None: + def subscribe(self, topic, cb) -> None: + """ Method to subscribe to mqtt topic + + Args: + topic: Topic to subscribe to + cb: Callback to call on message reception + + """ logging.debug("Subscribing to: {}".format(topic)) - self._client.subscribe(topic, qos) + self._client.subscribe(topic, qos=1) self._client.message_callback_add(topic, cb) @property diff --git a/python_transport/wirepas_gateway/transport_service.py b/python_transport/wirepas_gateway/transport_service.py index 3a0849e5..452dba91 100644 --- a/python_transport/wirepas_gateway/transport_service.py +++ b/python_transport/wirepas_gateway/transport_service.py @@ -13,6 +13,7 @@ from wirepas_gateway.dbus.dbus_client import BusClient from wirepas_gateway.protocol.topic_helper import TopicGenerator, TopicParser from wirepas_gateway.protocol.mqtt_wrapper import MQTTWrapper +from wirepas_gateway.protocol.message_cache import MessageCache from wirepas_gateway.utils import ParserHelper from wirepas_gateway import __version__ as transport_version @@ -227,6 +228,11 @@ def __init__(self, settings, **kwargs): else: self.data_event_id = None + self.message_cache = MessageCache( + settings.cache_time_window_s, + settings.cache_update_s + ) + def _on_mqtt_wrapper_termination_cb(self): """ Callback used to be informed when the MQTT wrapper has exited @@ -241,7 +247,7 @@ def _set_status(self): topic = TopicGenerator.make_status_topic(self.gw_id) - self.mqtt_wrapper.publish(topic, event_online.payload, qos=1, retain=True) + self.mqtt_wrapper.publish(topic, event_online.payload, retain=True) def _on_connect(self): # Register for get gateway info @@ -259,10 +265,7 @@ def _on_connect(self): # Register for send data request for any sink on the gateway topic = TopicGenerator.make_send_data_request_topic(self.gw_id) logging.debug("Subscribing to: %s", topic) - # It is important to have a qos of 2 and also from the publisher as 1 could generate - # duplicated packets and we don't know the consequences on end - # application - self.mqtt_wrapper.subscribe(topic, self._on_send_data_cmd_received, qos=2) + self.mqtt_wrapper.subscribe(topic, self._on_send_data_cmd_received) # Register for otap commands for any sink on the gateway topic = TopicGenerator.make_otap_status_request_topic(self.gw_id) @@ -356,10 +359,7 @@ def on_data_received( if self.data_event_id is not None: self.data_event_id += 1 - # Set qos to 1 to avoid loading too much the broker - # unique id in event header can be used for duplicate filtering in - # backends - self.mqtt_wrapper.publish(topic, event.payload, qos=1) + self.mqtt_wrapper.publish(topic, event.payload) def on_stack_started(self, name): logging.debug("Sink started: %s", name) @@ -384,7 +384,7 @@ def _send_asynchronous_set_config_response(self, name): sink.read_config(), ) topic = TopicGenerator.make_set_config_response_topic(self.gw_id, sink.sink_id) - self.mqtt_wrapper.publish(topic, response.payload, qos=2) + self.mqtt_wrapper.publish(topic, response.payload) def _send_asynchronous_get_configs_response(self): # Create a list of different sink configs @@ -401,7 +401,7 @@ def _send_asynchronous_get_configs_response(self): ) topic = TopicGenerator.make_get_configs_response_topic(self.gw_id) - self.mqtt_wrapper.publish(topic, response.payload, qos=2) + self.mqtt_wrapper.publish(topic, response.payload) def deferred_thread(fn): """ @@ -451,6 +451,12 @@ def _on_send_data_cmd_received(self, client, userdata, message): logging.error(str(e)) return + # Add the message to the cache, if it can't be added, + # the function should not send back any response to the request + if not self.message_cache.add_msg(request.req_id): + logging.warning(f"_on_send_data_cmd_received: A message with id={request.req_id} is already present in the cache !") + return + # Get the sink-id from topic _, sink_id = TopicParser.parse_send_data_topic(message.topic) @@ -480,7 +486,7 @@ def _on_send_data_cmd_received(self, client, userdata, message): response = wmm.SendDataResponse(request.req_id, self.gw_id, res, sink_id) topic = TopicGenerator.make_send_data_response_topic(self.gw_id, sink_id) - self.mqtt_wrapper.publish(topic, response.payload, qos=2) + self.mqtt_wrapper.publish(topic, response.payload) @deferred_thread def _on_get_configs_cmd_received(self, client, userdata, message): @@ -492,6 +498,12 @@ def _on_get_configs_cmd_received(self, client, userdata, message): logging.error(str(e)) return + # Add the message to the cache, if it can't be added, + # the function should not send back any response to the request + if not self.message_cache.add_msg(request.req_id): + logging.warning(f"_on_get_configs_cmd_received: A message with id={request.req_id} is already present in the cache !") + return + # Create a list of different sink configs configs = [] for sink in self.sink_manager.get_sinks(): @@ -504,7 +516,7 @@ def _on_get_configs_cmd_received(self, client, userdata, message): ) topic = TopicGenerator.make_get_configs_response_topic(self.gw_id) - self.mqtt_wrapper.publish(topic, response.payload, qos=2) + self.mqtt_wrapper.publish(topic, response.payload) @deferred_thread def _on_own_status_received(self, client, userdata, message): @@ -536,6 +548,10 @@ def _on_get_gateway_info_cmd_received(self, client, userdata, message): logging.error(str(e)) return + if not self.message_cache.add_msg(request.req_id): + logging.warning(f"_on_get_gateway_info_cmd_received: A message with id={request.req_id} is already present in the cache !") + return + response = wmm.GetGatewayInfoResponse( request.req_id, self.gw_id, @@ -547,7 +563,7 @@ def _on_get_gateway_info_cmd_received(self, client, userdata, message): ) topic = TopicGenerator.make_get_gateway_info_response_topic(self.gw_id) - self.mqtt_wrapper.publish(topic, response.payload, qos=2) + self.mqtt_wrapper.publish(topic, response.payload) @deferred_thread def _on_set_config_cmd_received(self, client, userdata, message): @@ -559,6 +575,10 @@ def _on_set_config_cmd_received(self, client, userdata, message): logging.error(str(e)) return + if not self.message_cache.add_msg(request.req_id): + logging.warning(f"_on_set_config_cmd_received: A message with id={request.req_id} is already present in the cache !") + return + logging.debug("Set sink config: %s", request) sink = self.sink_manager.get_sink(request.sink_id) if sink is not None: @@ -575,7 +595,7 @@ def _on_set_config_cmd_received(self, client, userdata, message): self.gw_id, request.sink_id ) - self.mqtt_wrapper.publish(topic, response.payload, qos=2) + self.mqtt_wrapper.publish(topic, response.payload) @deferred_thread def _on_otap_status_request_received(self, client, userdata, message): @@ -587,6 +607,12 @@ def _on_otap_status_request_received(self, client, userdata, message): logging.error(str(e)) return + # Add the message to the cache, if it can't be added, + # the function should not send back any response to the request + if not self.message_cache.add_msg(request.req_id): + logging.warning(f"_on_otap_status_request_received: A message with id={request.req_id} is already present in the cache !") + return + sink = self.sink_manager.get_sink(request.sink_id) if sink is not None: d = sink.get_scratchpad_status() @@ -625,7 +651,7 @@ def _on_otap_status_request_received(self, client, userdata, message): self.gw_id, request.sink_id ) - self.mqtt_wrapper.publish(topic, response.payload, qos=2) + self.mqtt_wrapper.publish(topic, response.payload) @deferred_thread def _on_otap_upload_scratchpad_request_received(self, client, userdata, message): @@ -639,6 +665,12 @@ def _on_otap_upload_scratchpad_request_received(self, client, userdata, message) logging.info("OTAP upload request received for %s", request.sink_id) + # Add the message to the cache, if it can't be added, + # the function should not send back any mqtt message + if not self.message_cache.add_msg(request.req_id): + logging.warning(f"_on_otap_upload_scratchpad_request_received: A message with id={request.req_id} is already present in the cache !") + return + sink = self.sink_manager.get_sink(request.sink_id) if sink is not None: res = sink.upload_scratchpad(request.seq, request.scratchpad) @@ -653,7 +685,7 @@ def _on_otap_upload_scratchpad_request_received(self, client, userdata, message) self.gw_id, request.sink_id ) - self.mqtt_wrapper.publish(topic, response.payload, qos=2) + self.mqtt_wrapper.publish(topic, response.payload) @deferred_thread def _on_otap_process_scratchpad_request_received(self, client, userdata, message): @@ -665,6 +697,10 @@ def _on_otap_process_scratchpad_request_received(self, client, userdata, message logging.error(str(e)) return + if not self.message_cache.add_msg(request.req_id): + logging.warning(f"_on_otap_process_scratchpad_request_received: A message with id={request.req_id} is already present in the cache !") + return + sink = self.sink_manager.get_sink(request.sink_id) if sink is not None: res = sink.process_scratchpad() @@ -679,7 +715,7 @@ def _on_otap_process_scratchpad_request_received(self, client, userdata, message self.gw_id, request.sink_id ) - self.mqtt_wrapper.publish(topic, response.payload, qos=2) + self.mqtt_wrapper.publish(topic, response.payload) @deferred_thread def _on_otap_set_target_scratchpad_request_received( @@ -700,6 +736,12 @@ def _on_otap_set_target_scratchpad_request_received( logging.error("Action is mandatory") res = wmm.GatewayResultCode.GW_RES_INVALID_PARAM + # Add the message to the cache, if it can't be added, + # the function should not send back any message + if not self.message_cache.add_msg(request.req_id): + logging.warning(f"_on_otap_set_target_scratchpad_request_received: A message with id={request.req_id} is already present in the cache !") + return + if res == wmm.GatewayResultCode.GW_RES_OK: # Get optional params (None if not present) seq = request.target.get("target_sequence") @@ -744,7 +786,7 @@ def _on_otap_set_target_scratchpad_request_received( self.gw_id, request.sink_id ) - self.mqtt_wrapper.publish(topic, response.payload, qos=2) + self.mqtt_wrapper.publish(topic, response.payload) def parse_setting_list(list_setting): @@ -900,6 +942,7 @@ def main(): parse.add_gateway_config() parse.add_filtering_config() parse.add_buffering_settings() + parse.add_cache_settings() parse.add_debug_settings() parse.add_deprecated_args() diff --git a/python_transport/wirepas_gateway/utils/argument_tools.py b/python_transport/wirepas_gateway/utils/argument_tools.py index 2f45b1ec..6b48bf1c 100644 --- a/python_transport/wirepas_gateway/utils/argument_tools.py +++ b/python_transport/wirepas_gateway/utils/argument_tools.py @@ -416,6 +416,32 @@ def add_debug_settings(self): ), ) + def add_cache_settings(self): + """ Parameters used to avoid sending redundant messages in the mqtt broker """ + cleaning_time_window_s_default = 1200 # 20 minutes + cache_update_s_default = 20 # 20 secondes + + self.cache.add_argument( + "--cache_time_window_s", + default=os.environ.get("WM_CACHE_TIME_WINDOW_S", cleaning_time_window_s_default), + action="store", + type=self.str2int, + help=( + "Time in seconds after which a message is clean from the cache." + ), + ) + + self.cache.add_argument( + "--cache_update_s", + default=os.environ.get("WM_CACHE_UPDATE_S", cache_update_s_default), + action="store", + type=self.str2int, + help=( + "Period to update the list of received messages in seconds." + "Must be smaller than time_window" + ), + ) + @staticmethod def _deprecated_message(new_arg_name, deprecated_from="2.x"): """ Alerts the user that an argument will be deprecated within the