From eda673ea966c20df6057f21ec8a683bcdbd849ef Mon Sep 17 00:00:00 2001 From: root Date: Tue, 31 Oct 2017 18:20:36 +0530 Subject: [PATCH 1/2] Offline buffering mechanism --- liota/core/Readme.md | 42 +++++++ liota/core/offlineQueue.py | 127 +++++++++++++++++++ liota/core/offline_database.py | 152 +++++++++++++++++++++++ liota/dcc_comms/check_connection.py | 27 ++++ liota/dcc_comms/timeout_exceptions.py | 2 + liota/dccs/aws_iot.py | 4 +- liota/dccs/dcc.py | 82 ++++++++++-- liota/dccs/graphite.py | 8 +- liota/dccs/iotcc.py | 5 +- liota/lib/utilities/offline_buffering.py | 44 +++++++ packages/graphite.py | 6 +- 11 files changed, 483 insertions(+), 16 deletions(-) create mode 100644 liota/core/Readme.md create mode 100644 liota/core/offlineQueue.py create mode 100644 liota/core/offline_database.py create mode 100644 liota/dcc_comms/check_connection.py create mode 100644 liota/dcc_comms/timeout_exceptions.py create mode 100644 liota/lib/utilities/offline_buffering.py diff --git a/liota/core/Readme.md b/liota/core/Readme.md new file mode 100644 index 00000000..5a389e73 --- /dev/null +++ b/liota/core/Readme.md @@ -0,0 +1,42 @@ +# Offline data storage +If the client faces network disconnectivity, publish message can be stored as a persistent storage or in a temporary offline queue in which publish data will be added to an internal queue until the number of queued-up requests reaches the size limit of the queue. If the size of the queue is defined as negative integer it will act as a infinite queue. One can also choose the queue behaviour after it reaches it's specified size. If drop_oldest behaviour is set to be true, oldest publish message is dropped else the newest publish messages are dropped. One should specify the draining frequency in each case, which implies how data which has been stored will be published once the network connectivity is established. +You can also specify data_drain_size which speicifes ow much data will be drained at once after the internet connectivity is established again. By default both are set to 1. + +# Example +By default buffering_params is set to None, i.e buffering mechanism is disabled. +Suppose we want to create a persistent storage, while creating instance of DCC, we would pass the an instance of Buffering class along with it. + +``` +buffering = Buffering(persistent_storage=True, data_drain_size=10, draining_frequency=1) +graphite = Graphite(SocketDccComms(ip=config['GraphiteIP'],port=8080), + offline_buffering=buffering) +``` +Here data_drain_size is 1 and draining_frequency is 1 which specifies 10 messages will be sent per second. +For persistent storage a database will be created by the name of storage.db which will store all the messages while network connectivity is broken. +Once network connectivity is back messages will be removed from database as they get published. +In case of ```persistent_storage``` as ```False``` the queueing mechanism will be used by default, you can specify queue_size and other parameters like drop_oldest, data_drain_size and draining_frequency: +``` +buffering = Buffering(queue_size=-1,data_drain_size=10, draining_frequency=1) +``` +will create a queueing mechanism with infinite size and drop_behaviour by default is true, data_drain_size and draining_frequency can be any positive integer. +For queue with size 3 and drop_oldest behaviour set to true, +``` +buffering = Buffering(queue_size=3, drop_oldest=True, draining_frequency=1) +``` +As the publish message arrives the queue will be like this after 3 publish message arrive: +``` +['msg1', 'msg2', 'msg3'] +``` +As the fourth publish message arrives: +``` +['msg2', 'msg3', 'msg4'] +``` +For the fifth publish message: +``` +['msg3', 'msg4', 'msg5'] +``` +Similarly, if the drop_oldest behaviour is set to False: +``` +['msg1', 'msg2', 'msg3'] +``` +After this any new coming publish message will be dropped. diff --git a/liota/core/offlineQueue.py b/liota/core/offlineQueue.py new file mode 100644 index 00000000..c3debbcb --- /dev/null +++ b/liota/core/offlineQueue.py @@ -0,0 +1,127 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +import logging +from collections import deque +import threading +import time +from liota.dcc_comms.dcc_comms import DCCComms +from liota.dcc_comms.check_connection import checkConnection + +log = logging.getLogger(__name__) + +class offlineQueue: + def __init__(self, queue_size, comms, conn=None, data_drain_size=1, drop_oldest=True, draining_frequency=0): + """ + :param size: size of the offline_queue, if negative implies infinite. + :param drop_oldest: if True oldest data will be dropped after size of queue is exceeded. + :param comms: comms instance of DCCComms + :param draining_frequency: frequency with which data will be published after internet connectivity established. + """ + if not isinstance(queue_size, int): + log.error("Size is expected of int type.") + raise TypeError("Size is expected of int type.") + if not isinstance(comms, DCCComms): + log.error("DCCComms object is expected.") + raise TypeError("DCCComms object is expected.") + if not isinstance(drop_oldest, bool): + log.error("drop_oldest/newest is expected of bool type.") + raise TypeError("drop_oldest is expected of bool type.") + if not isinstance(draining_frequency, float) and not isinstance(draining_frequency, int): + log.error("draining_frequency is expected of float or int type.") + raise TypeError("draining_frequency is expected of float or int type.") + try: + assert queue_size!=0 and draining_frequency>=0 + except AssertionError as e: + log.error("Size can't be zero, draining_frequency can't be negative.") + raise e("Size can't be zero, draining_frequency can't be negative.") + self.size = queue_size + self.drop_oldest = drop_oldest + if (self.size>0 and drop_oldest): + self.d = deque(maxlen=self.size) + else: + self.d = deque() + self.comms = comms + self.data_drain_size = data_drain_size + if conn is None: + self.conn = checkConnection() + else: + self.conn = conn + self.draining_frequency = draining_frequency + self.draining_in_progress = False + self._offlineQLock = threading.Lock() + + def append(self, data): + if (self.size<0): #for infinite length deque + self.d.append(data) + elif (self.size>0 and self.drop_oldest): #for deque with drop_oldest=True + if len(self.d) is self.size: + log.info("Message dropped: {}".format(self.d[0])) + self.d.append(data) + else: #for deque with drop_oldest=False + if len(self.d) is self.size: + log.info("Message dropped: {}".format(data)) + else: + self.d.append(data) + + def _drain(self): + self._offlineQLock.acquire() + data_drained = 0 + self.draining_in_progress = True + try: + while self.d: + if self.conn.check: + data = self.d.popleft() + self.comms.send(data) + data_drained+=1 + log.info("Data Drain: {}".format(data)) + else: #if internet conncetivity breaks while draining + log.warning("Internet broke while draining.") + break + if data_drained==self.data_drain_size: #if some amt. of data drained thread sleeps for specified draining_freqn. + print "Length of queue: ", len(self.d) + data_drained=0 + time.sleep(self.draining_frequency) + except Exception as e: + log.warning("Internet connectivity broke while draining.") + raise e + finally: + self.draining_in_progress = False + self._offlineQLock.release() + + def start_drain(self): + queueDrain = threading.Thread(target=self._drain) + queueDrain.daemon = True + queueDrain.start() + + def show(self): + print self.d diff --git a/liota/core/offline_database.py b/liota/core/offline_database.py new file mode 100644 index 00000000..b4df5464 --- /dev/null +++ b/liota/core/offline_database.py @@ -0,0 +1,152 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +import logging +import sqlite3 +import threading +import time +from liota.dcc_comms.dcc_comms import DCCComms +from liota.dcc_comms.check_connection import checkConnection + +log = logging.getLogger(__name__) + +class offline_database: + def __init__(self, table_name, comms, conn=None, data_drain_size=1, draining_frequency=0): + """ + :param table_name: table_name in which message will be stored + :param comms: comms instance of DCCComms + :param draining_frequency: frequency with which data will be published after internet connectivity established. + """ + if not isinstance(table_name, basestring): + log.error("Table name should be a string.") + raise TypeError("Table name should be a string.") + if not isinstance(comms, DCCComms): + log.error("DCCComms object is expected.") + raise TypeError("DCCComms object is expected.") + if not isinstance(draining_frequency, float) and not isinstance(draining_frequency, int): + log.error("draining_frequency is expected of float or int type.") + raise TypeError("draining_frequency is expected of float or int type.") + try: + assert draining_frequency>=0 + except AssertionError as e: + log.error("draining_frequency can't be negative.") + raise e("draining_frequency can't be negative.") + self.table_name = table_name + if conn is None: + self.internet_conn = checkConnection() + else: + self.internet_conn = conn + self.draining_frequency = draining_frequency + self.data_drain_size = data_drain_size + self.comms = comms + self.flag_conn_open = False + self.draining_in_progress = False + self._offline_db_lock = threading.Lock() + self._create_table() + + def _create_table(self): + if self.flag_conn_open is False: + self.conn = sqlite3.connect('storage.db') + try: + with self.conn: + if not self.conn.execute("SELECT name FROM sqlite_master WHERE TYPE='table' AND name= ? ", (self.table_name,)).fetchone(): + self.conn.text_factory = str + self.flag_conn_open = True + self.cursor = self.conn.cursor() + self.cursor.execute("CREATE TABLE "+self.table_name+" (Message TEXT)") + self.cursor.close() + del self.cursor + else: + print "Table already there!!!" + except Exception as e: + raise e + finally: + self.flag_conn_open = False + self.conn.close() + + def add(self, message): + try: + self.conn = sqlite3.connect('storage.db') + self.flag_conn_open = True + with self.conn: + self.cursor = self.conn.cursor() + print "Adding data to "+ self.table_name + self.cursor.execute("INSERT INTO "+self.table_name+"(Message) VALUES (?);", (message,)) + self.cursor.close() + del self.cursor + except sqlite3.OperationalError as e: + raise e + finally: + self.conn.close() + self.flag_conn_open = False + + def _drain(self): + self._offline_db_lock.acquire() + self.conn = sqlite3.connect('storage.db') + self.flag_conn_open = True + self.draining_in_progress = True + self.cursor = self.conn.cursor() + self.del_cursor = self.conn.cursor() + data_drained = 0 + try: + for row in self.cursor.execute("SELECT Message FROM "+self.table_name): + if self.comms is not None and self.internet_conn.check : + try: + self.comms.send(row[0]) + log.info("Data Drain: {}".format(row[0])) + print "Data drained: ",row[0] + data_drained+=1 + self.del_cursor.execute("Delete from "+self.table_name+" where rowid IN (Select rowid from "+self.table_name+" limit 1);") + self.conn.commit() + except Exception as e: + raise e + else: #internet connectivity breaks while draining + log.warning("Internet broke while draining") + break + if data_drained==self.data_drain_size: #if some amt. of data drained thread sleeps for specified draining_freqn. + data_drained=0 + time.sleep(self.draining_frequency) + except Exception as e: + raise e + log.warning("Internet connectivity broke while draining.") + finally: + self.del_cursor.close() + del self.del_cursor + self.conn.close() + self.flag_conn_open = False + self.draining_in_progress = False + self._offline_db_lock.release() + + def start_drain(self): + queueDrain = threading.Thread(target=self._drain) + queueDrain.daemon = True + queueDrain.start() diff --git a/liota/dcc_comms/check_connection.py b/liota/dcc_comms/check_connection.py new file mode 100644 index 00000000..332b8b0d --- /dev/null +++ b/liota/dcc_comms/check_connection.py @@ -0,0 +1,27 @@ +import os +import threading +import time +from liota.dcc_comms.timeout_exceptions import timeoutException + +class checkConnection: + def __init__(self, interval=1, hostname = "8.8.8.8"): + self.interval = interval + self.hostname = hostname + self.check = 1 + self.thread = threading.Thread(target=self.run) + self.thread.daemon = True + self.thread.start() + + def run(self): + while True: + self.check = self.check_internet() + #z = self.thread.isAlive() + time.sleep(self.interval) + + def check_internet(self): + response = os.system("ping -c 1 " + self.hostname + " > /dev/null 2>&1") + if response == 0: + pingstatus = 1 + else: + pingstatus = 0 + return pingstatus diff --git a/liota/dcc_comms/timeout_exceptions.py b/liota/dcc_comms/timeout_exceptions.py new file mode 100644 index 00000000..9bc0d088 --- /dev/null +++ b/liota/dcc_comms/timeout_exceptions.py @@ -0,0 +1,2 @@ +class timeoutException(Exception): + pass diff --git a/liota/dccs/aws_iot.py b/liota/dccs/aws_iot.py index 9b5a6934..1c152159 100644 --- a/liota/dccs/aws_iot.py +++ b/liota/dccs/aws_iot.py @@ -49,13 +49,13 @@ class AWSIoT(DataCenterComponent): """ DCC for AWSIoT Platform. """ - def __init__(self, con, enclose_metadata=False): + def __init__(self, con, enclose_metadata=False, buffering_params=None): """ :param con: DccComms Object :param enclose_metadata: Include Gateway, Device and Metric names as part of payload or not """ super(AWSIoT, self).__init__( - comms=con + comms=con, buffering_params=buffering_params ) self.enclose_metadata = enclose_metadata diff --git a/liota/dccs/dcc.py b/liota/dccs/dcc.py index dfa867f1..b24f023c 100644 --- a/liota/dccs/dcc.py +++ b/liota/dccs/dcc.py @@ -31,28 +31,43 @@ # ----------------------------------------------------------------------------# import logging +import json from abc import ABCMeta, abstractmethod from liota.entities.entity import Entity from liota.dcc_comms.dcc_comms import DCCComms from liota.entities.metrics.registered_metric import RegisteredMetric +from liota.dcc_comms.check_connection import checkConnection +from liota.core.offlineQueue import offlineQueue +from liota.core.offline_database import offline_database +from liota.lib.utilities.offline_buffering import BufferingParams log = logging.getLogger(__name__) - class DataCenterComponent: - """ Abstract base class for all DCCs. """ __metaclass__ = ABCMeta @abstractmethod - def __init__(self, comms): + def __init__(self, comms, buffering_params): if not isinstance(comms, DCCComms): log.error("DCCComms object is expected.") raise TypeError("DCCComms object is expected.") + if not isinstance(buffering_params, BufferingParams): + log.error("Buffering object is expected.") + raise TypeError("Buffering object is expected.") self.comms = comms + self.buffering_params = buffering_params + if self.buffering_params is not None: + self.persistent_storage = self.buffering_params.persistent_storage + self.data_drain_size = self.buffering_params.data_drain_size + self.draining_frequency = self.buffering_params.draining_frequency + self.drop_oldest = self.buffering_params.drop_oldest + self.queue_size = self.buffering_params.queue_size + self.conn = checkConnection() + self.offline_buffering_enabled = False #False means offline buffering/storage is off else on # ----------------------------------------------------------------------- # Implement this method in subclasses and do actual registration. @@ -80,11 +95,63 @@ def publish(self, reg_metric): if not isinstance(reg_metric, RegisteredMetric): log.error("RegisteredMetric object is expected.") raise TypeError("RegisteredMetric object is expected.") + message = self._format_data(reg_metric) - if hasattr(reg_metric, 'msg_attr'): - self.comms.send(message, reg_metric.msg_attr) + if message is not None: + if self.buffering_params is not None: + if self.conn.check: + if self.offline_buffering_enabled: #checking if buffering is enabled or not, incase internet comes back after disconnectivity + self.offline_buffering_enabled = False + if self.persistent_storage is True: + log.info("Draining starts.") + self.offline_database.start_drain() + else: + self.offlineQ.start_drain() + try: + if hasattr(reg_metric, 'msg_attr'): + self.comms.send(message, reg_metric.msg_attr) + else: + self.comms.send(message, None) + except Exception as e: + raise e + else: #if no internet connectivity + if self.persistent_storage is True: + table_name = self.__class__.__name__ + type(self.comms).__name__ + self._start_database_storage(table_name, message) + else: + self._start_queuing(message) + else: + if hasattr(reg_metric, 'msg_attr'): + self.comms.send(message, reg_metric.msg_attr) + else: + self.comms.send(message, None) + + def _start_queuing(self, message): + if self.offline_buffering_enabled is False: + self.offline_buffering_enabled = True + try: + if self.offlineQ.draining_in_progress: + self.offlineQ.append(message) + except Exception as e: + self.offlineQ = offlineQueue(comms=self.comms, conn=self.conn, queue_size=self.queue_size, + data_drain_size=self.data_drain_size, drop_oldest=self.drop_oldest, + draining_frequency=self.draining_frequency) + log.info("Offline queueing started.") + self.offlineQ.append(message) + + def _start_database_storage(self, table_name, message): + if self.offline_buffering_enabled is False: + self.offline_buffering_enabled = True + try: + if self.offline_database.draining_in_progress: + self.offline_database.add(message) + except Exception as e: + self.offline_database = offline_database(table_name=table_name, comms=self.comms, conn=self.conn, + data_drain_size=self.data_drain_size, draining_frequency=self.draining_frequency) + log.info("Database created.") + self.offline_database.add(message) else: - self.comms.send(message, None) + self.offline_database.add(message) @abstractmethod def set_properties(self, reg_entity, properties): @@ -95,4 +162,5 @@ def unregister(self, entity_obj): if not isinstance(entity_obj, Entity): raise TypeError -class RegistrationFailure(Exception): pass +class RegistrationFailure(Exception): + pass diff --git a/liota/dccs/graphite.py b/liota/dccs/graphite.py index 24b2ff55..72bbd8b1 100644 --- a/liota/dccs/graphite.py +++ b/liota/dccs/graphite.py @@ -29,20 +29,20 @@ # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # # THE POSSIBILITY OF SUCH DAMAGE. # # ----------------------------------------------------------------------------# + import logging from liota.dccs.dcc import DataCenterComponent from liota.entities.metrics.registered_metric import RegisteredMetric from liota.entities.metrics.metric import Metric from liota.entities.registered_entity import RegisteredEntity - +from liota.lib.utilities.utility import getUTCmillis log = logging.getLogger(__name__) - class Graphite(DataCenterComponent): - def __init__(self, comms): + def __init__(self, comms, buffering_params=None): super(Graphite, self).__init__( - comms=comms + comms=comms,buffering_params=buffering_params ) def register(self, entity_obj): diff --git a/liota/dccs/iotcc.py b/liota/dccs/iotcc.py index f885a3c4..241754b0 100755 --- a/liota/dccs/iotcc.py +++ b/liota/dccs/iotcc.py @@ -57,7 +57,10 @@ class IotControlCenter(DataCenterComponent): """ - def __init__(self, con): + def __init__(self, con, buffering_params=None): + super(IotControlCenter, self).__init__( + buffering_params=buffering_params + ) log.info("Logging into DCC") self._version = 20171023 self.comms = con diff --git a/liota/lib/utilities/offline_buffering.py b/liota/lib/utilities/offline_buffering.py new file mode 100644 index 00000000..fc391980 --- /dev/null +++ b/liota/lib/utilities/offline_buffering.py @@ -0,0 +1,44 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +import logging + +log = logging.getLogger(__name__) + +class BufferingParams: + def __init__(self, queue_size=-1, persistent_storage=False, data_drain_size=10, drop_oldest=True, draining_frequency=1): + self.persistent_storage = persistent_storage + self.queue_size = queue_size + self.data_drain_size = data_drain_size + self.drop_oldest = drop_oldest + self.draining_frequency = draining_frequency + diff --git a/packages/graphite.py b/packages/graphite.py index 9a5302e1..5d392e29 100644 --- a/packages/graphite.py +++ b/packages/graphite.py @@ -46,7 +46,8 @@ def run(self, registry): import copy from liota.dccs.graphite import Graphite from liota.dcc_comms.socket_comms import SocketDccComms - + from liota.lib.utilities.offline_buffering import BufferingParams + # Acquire resources from registry # Creating a copy of system object to keep original object "clean" edge_system = copy.copy(registry.get("edge_system")) @@ -56,9 +57,10 @@ def run(self, registry): config = read_user_config(config_path + '/sampleProp.conf') # Initialize DCC object with transport + offline_buffering = BufferingParams(persistent_storage=True, queue_size=-1, data_drain_size=10, draining_frequency=1) self.graphite = Graphite( SocketDccComms(ip=config['GraphiteIP'], - port=config['GraphitePort']) + port=config['GraphitePort']), buffering_params=offline_buffering ) # Register gateway system From d4d837446ebffde93f11be4e023d1c3bd2b81c4f Mon Sep 17 00:00:00 2001 From: root Date: Sun, 5 Nov 2017 17:47:35 +0530 Subject: [PATCH 2/2] Updated README.md --- liota/core/{Readme.md => README.md} | 0 liota/core/offlineQueue.py | 7 ++----- liota/core/offline_database.py | 8 ++++---- 3 files changed, 6 insertions(+), 9 deletions(-) rename liota/core/{Readme.md => README.md} (100%) diff --git a/liota/core/Readme.md b/liota/core/README.md similarity index 100% rename from liota/core/Readme.md rename to liota/core/README.md diff --git a/liota/core/offlineQueue.py b/liota/core/offlineQueue.py index c3debbcb..d2b32330 100644 --- a/liota/core/offlineQueue.py +++ b/liota/core/offlineQueue.py @@ -45,7 +45,8 @@ def __init__(self, queue_size, comms, conn=None, data_drain_size=1, drop_oldest= :param size: size of the offline_queue, if negative implies infinite. :param drop_oldest: if True oldest data will be dropped after size of queue is exceeded. :param comms: comms instance of DCCComms - :param draining_frequency: frequency with which data will be published after internet connectivity established. + :param data_drain_size: how many messages will be drained within each draining_frequency secs defined. + :param draining_frequency: frequency with which data will be published after internet connectivity established(like seconds). """ if not isinstance(queue_size, int): log.error("Size is expected of int type.") @@ -108,7 +109,6 @@ def _drain(self): log.warning("Internet broke while draining.") break if data_drained==self.data_drain_size: #if some amt. of data drained thread sleeps for specified draining_freqn. - print "Length of queue: ", len(self.d) data_drained=0 time.sleep(self.draining_frequency) except Exception as e: @@ -122,6 +122,3 @@ def start_drain(self): queueDrain = threading.Thread(target=self._drain) queueDrain.daemon = True queueDrain.start() - - def show(self): - print self.d diff --git a/liota/core/offline_database.py b/liota/core/offline_database.py index b4df5464..efe948cc 100644 --- a/liota/core/offline_database.py +++ b/liota/core/offline_database.py @@ -44,7 +44,8 @@ def __init__(self, table_name, comms, conn=None, data_drain_size=1, draining_fre """ :param table_name: table_name in which message will be stored :param comms: comms instance of DCCComms - :param draining_frequency: frequency with which data will be published after internet connectivity established. + :param data_drain_size: how many messages will be drained within each draining_frequency secs defined. + :param draining_frequency: frequency with which data will be published after internet connectivity established(like seconds). """ if not isinstance(table_name, basestring): log.error("Table name should be a string.") @@ -86,7 +87,7 @@ def _create_table(self): self.cursor.close() del self.cursor else: - print "Table already there!!!" + log.info("Table already there!!!") except Exception as e: raise e finally: @@ -99,7 +100,7 @@ def add(self, message): self.flag_conn_open = True with self.conn: self.cursor = self.conn.cursor() - print "Adding data to "+ self.table_name + log.info("Adding data to "+ self.table_name) self.cursor.execute("INSERT INTO "+self.table_name+"(Message) VALUES (?);", (message,)) self.cursor.close() del self.cursor @@ -123,7 +124,6 @@ def _drain(self): try: self.comms.send(row[0]) log.info("Data Drain: {}".format(row[0])) - print "Data drained: ",row[0] data_drained+=1 self.del_cursor.execute("Delete from "+self.table_name+" where rowid IN (Select rowid from "+self.table_name+" limit 1);") self.conn.commit()