Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions python_transport/tests/test_arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
env_vars["WM_GW_BUFFERING_MAX_BUFFERED_PACKETS"] = 1000
env_vars["WM_GW_BUFFERING_MAX_DELAY_WITHOUT_PUBLISH"] = 128
env_vars["WM_GW_BUFFERING_MINIMAL_SINK_COST"] = 240
env_vars["WM_CACHE_TIME_WINDOW_S"] = 1800
env_vars["WM_CACHE_UPDATE_S"] = 60
env_vars["WM_GW_ID"] = "1"
env_vars["WM_GW_MODEL"] = "test"
env_vars["WM_GW_VERSION"] = "pytest"
Expand Down Expand Up @@ -54,6 +56,8 @@
"WM_GW_BUFFERING_MAX_DELAY_WITHOUT_PUBLISH"
]
file_vars["buffering_minimal_sink_cost"] = env_vars["WM_GW_BUFFERING_MINIMAL_SINK_COST"]
file_vars["cache_time_window_s"] = env_vars["WM_CACHE_TIME_WINDOW_S"]
file_vars["cache_update_s"] = env_vars["WM_CACHE_UPDATE_S"]

file_vars["gateway_id"] = env_vars["WM_GW_ID"]
file_vars["gateway_model"] = env_vars["WM_GW_MODEL"]
Expand Down Expand Up @@ -103,6 +107,7 @@ def get_args(delete_bools=False, set_env=True, set_file=False):
parse.add_gateway_config()
parse.add_filtering_config()
parse.add_buffering_settings()
parse.add_cache_settings()

settings = parse.settings()

Expand Down Expand Up @@ -153,6 +158,14 @@ def content_tests(settings, vcopy):
vcopy["WM_GW_BUFFERING_MINIMAL_SINK_COST"]
== settings.buffering_minimal_sink_cost
)
assert (

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Black would make changes.

vcopy["WM_CACHE_TIME_WINDOW_S"]
== settings.cache_time_window_s
)
assert (
vcopy["WM_CACHE_UPDATE_S"]
== settings.cache_update_s
)

assert vcopy["WM_GW_ID"] == settings.gateway_id
assert vcopy["WM_GW_MODEL"] == settings.gateway_model
Expand Down Expand Up @@ -184,6 +197,7 @@ def test_defaults():
parse.add_gateway_config()
parse.add_filtering_config()
parse.add_buffering_settings()
parse.add_cache_settings()

sys.argv = [sys.argv[0]]
settings = parse.settings()
Expand All @@ -203,6 +217,8 @@ def test_defaults():
assert settings.buffering_max_buffered_packets == 0
assert settings.buffering_max_delay_without_publish == 0
assert settings.buffering_minimal_sink_cost == 0
assert settings.cache_update_s == 20
assert settings.cache_time_window_s == 1200
assert settings.gateway_id is None
assert settings.gateway_model is None
assert settings.gateway_version is None
Expand Down
57 changes: 57 additions & 0 deletions python_transport/tests/test_cache_message.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
from time import sleep
from wirepas_gateway.protocol.message_cache import MessageCache

CACHE_TIME_WINDOW_S = 0.1
CACHE_UPDATE_S = 0.025
REQ_ID = 160
REQ_ID2 = 161


def test_adding_msg():
"""
Tests the adding of message in the cache.
"""
message_cache = MessageCache(CACHE_TIME_WINDOW_S, CACHE_UPDATE_S)
assert message_cache.get_size() == 0

assert message_cache.add_msg(REQ_ID) is True
assert message_cache.get_size() == 1

assert message_cache.add_msg(REQ_ID2) is True
assert message_cache.get_size() == 2

# Test the mapping in the cache
assert message_cache.is_in_cache(REQ_ID)
assert message_cache.is_in_cache(REQ_ID2)


def test_duplicate():
"""
Tests if the duplicate messages are not stored
"""
message_cache = MessageCache(CACHE_TIME_WINDOW_S, CACHE_UPDATE_S)
assert message_cache.get_size() == 0

assert message_cache.add_msg(REQ_ID) is True
assert message_cache.get_size() == 1

# Test if a redundant message is dropped
assert message_cache.add_msg(REQ_ID) is False
assert message_cache.get_size() == 1


def test_cache_time_window():
"""
Tests the well functionning of cache time window.
"""
message_cache = MessageCache(CACHE_TIME_WINDOW_S, CACHE_UPDATE_S)
assert message_cache.add_msg(REQ_ID) is True

# Test if the cache has reset eventually
sleep(CACHE_TIME_WINDOW_S/2)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Black would make changes.
missing whitespace around arithmetic operator

assert message_cache.is_in_cache(REQ_ID) is True
sleep(CACHE_TIME_WINDOW_S)
assert message_cache.is_in_cache(REQ_ID) is False

# And that a message with the same id can be added after the time window
assert message_cache.add_msg(REQ_ID) is True
87 changes: 87 additions & 0 deletions python_transport/wirepas_gateway/protocol/message_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
# Copyright 2022 Wirepas Ltd licensed under Apache License, Version 2.0
#
# See file LICENSE for full license details.
#
from time import time, sleep
from threading import Lock, Thread


class MessageCache:
"""
Class to cache all recent messages to avoid sending duplicates in MQTT.
"""
def __init__(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Black would make changes.

self,
cache_time_window_s,
cache_update_s
):
"""
Args:
cache_time_window_s: time in seconds after which a message is removed from the cache.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long (97 > 79 characters)

cache_update_s: period in seconds to update the list of received messages.
Must be shorter than cache_time_window_s
"""
self._lock = Lock()
# Dictionary of received messages id mapped to their timestamps
self._cache = dict()
# last time an update of the list of received messages was done
self._last_update_time = 0

self.cache_time_window_s = cache_time_window_s
self.cache_update_s = min(cache_update_s, cache_time_window_s)

# Start a thread that cleans periodically the cache.
self._clean_cache_thread = Thread(target=self._clean_cache_thread, daemon=True)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line too long (87 > 79 characters)

self._clean_cache_thread.start()

def add_msg(self, msg_id):
"""
Adds a received message to the cache. If the message is a duplicate,
it only updates its last timestamp to the current time.

Args:
msg_id: the id of a message to be added to the cache.
"""
current_time = time()
is_duplicate = self.is_in_cache(msg_id)
with self._lock:
self._cache[msg_id] = current_time
return not is_duplicate

def get_size(self):
"""
Returns the number of messages in the cache.
"""
with self._lock:
return len(self._cache)

def _clean_cache(self):
"""
Clean the old messages in the cache
based on the time window cache_time_window_s attribute.
"""
with self._lock:
current_time = time()
self._cache = {
msg_id: msg_time for (msg_id, msg_time) in self._cache.items()
if current_time - msg_time <= self.cache_time_window_s
}
self._last_update_time = current_time

def _clean_cache_thread(self):
"""
Clean the cache and sleep cache_update_s seconds in loop.
"""
while True:
self._clean_cache()
sleep(self.cache_update_s)

def is_in_cache(self, msg_id):
"""
Return True if the id of a message is already in the cache, False otherwise.

Args:
msg_id: id of the message
"""
with self._lock:
return msg_id in self._cache
27 changes: 16 additions & 11 deletions python_transport/wirepas_gateway/protocol/mqtt_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,10 @@ def _do_select(self, sock):
# Publish everything. Loop is not necessary as
# next select will exit immediately if queue not empty
while True:
topic, payload, qos, retain = self._publish_queue.get()
topic, payload, retain = self._publish_queue.get()

self._publish_from_wrapper_thread(
topic, payload, qos=qos, retain=retain
topic, payload, retain=retain
)

# FIX: read internal sockpairR as it is written but
Expand Down Expand Up @@ -233,7 +233,7 @@ def _get_socket(self):

def _set_last_will(self, topic, data):
# Set Last wil message
self._client.will_set(topic, data, qos=2, retain=True)
self._client.will_set(topic, data, qos=1, retain=True)

def run(self):
self.running = True
Expand Down Expand Up @@ -267,37 +267,42 @@ def run(self):
# thread has exited
self.on_termination_cb()

def _publish_from_wrapper_thread(self, topic, payload, qos, retain):
def _publish_from_wrapper_thread(self, topic, payload, retain):
"""Internal method to publish on Mqtt. This method is only called from
mqtt wrapper thread to avoid races.

Args:
topic: Topic to publish on
payload: Payload
qos: Qos to use
retain: Is it a retain message

"""
mid = self._client.publish(topic, payload, qos=qos, retain=retain).mid
mid = self._client.publish(topic, payload, qos=1, retain=retain).mid
self._unpublished_mid_set.add(mid)

def publish(self, topic, payload, qos=1, retain=False) -> None:
def publish(self, topic, payload, retain=False) -> None:
""" Method to publish to Mqtt from any thread

Args:
topic: Topic to publish on
payload: Payload
qos: Qos to use
retain: Is it a retain message

"""
# Send it to the queue to be published from Mqtt thread
self._publish_queue.put((topic, payload, qos, retain))
self._publish_queue.put((topic, payload, retain))
self._publish_monitor.on_publish_request()

def subscribe(self, topic, cb, qos=2) -> None:
def subscribe(self, topic, cb) -> None:
""" Method to subscribe to mqtt topic

Args:
topic: Topic to subscribe to
cb: Callback to call on message reception

"""
logging.debug("Subscribing to: {}".format(topic))
self._client.subscribe(topic, qos)
self._client.subscribe(topic, qos=1)
self._client.message_callback_add(topic, cb)

@property
Expand Down
Loading