Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions liota/core/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Offline data storage
If the client faces network disconnectivity, publish message can be stored as a persistent storage or in a temporary offline queue in which publish data will be added to an internal queue until the number of queued-up requests reaches the size limit of the queue. If the size of the queue is defined as negative integer it will act as a infinite queue. One can also choose the queue behaviour after it reaches it's specified size. If drop_oldest behaviour is set to be true, oldest publish message is dropped else the newest publish messages are dropped. One should specify the draining frequency in each case, which implies how data which has been stored will be published once the network connectivity is established.
You can also specify data_drain_size which speicifes ow much data will be drained at once after the internet connectivity is established again. By default both are set to 1.

# Example
By default buffering_params is set to None, i.e buffering mechanism is disabled.
Suppose we want to create a persistent storage, while creating instance of DCC, we would pass the an instance of Buffering class along with it.

```
buffering = Buffering(persistent_storage=True, data_drain_size=10, draining_frequency=1)
graphite = Graphite(SocketDccComms(ip=config['GraphiteIP'],port=8080),
offline_buffering=buffering)
```
Here data_drain_size is 1 and draining_frequency is 1 which specifies 10 messages will be sent per second.
For persistent storage a database will be created by the name of storage.db which will store all the messages while network connectivity is broken.
Once network connectivity is back messages will be removed from database as they get published.
In case of ```persistent_storage``` as ```False``` the queueing mechanism will be used by default, you can specify queue_size and other parameters like drop_oldest, data_drain_size and draining_frequency:
```
buffering = Buffering(queue_size=-1,data_drain_size=10, draining_frequency=1)
```
will create a queueing mechanism with infinite size and drop_behaviour by default is true, data_drain_size and draining_frequency can be any positive integer.
For queue with size 3 and drop_oldest behaviour set to true,
```
buffering = Buffering(queue_size=3, drop_oldest=True, draining_frequency=1)
```
As the publish message arrives the queue will be like this after 3 publish message arrive:
```
['msg1', 'msg2', 'msg3']
```
As the fourth publish message arrives:
```
['msg2', 'msg3', 'msg4']
```
For the fifth publish message:
```
['msg3', 'msg4', 'msg5']
```
Similarly, if the drop_oldest behaviour is set to False:
```
['msg1', 'msg2', 'msg3']
```
After this any new coming publish message will be dropped.
124 changes: 124 additions & 0 deletions liota/core/offlineQueue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------------#
# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. #
# #
# Licensed under the BSD 2-Clause License (the “License”); you may not use #
# this file except in compliance with the License. #
# #
# The BSD 2-Clause License #
# #
# Redistribution and use in source and binary forms, with or without #
# modification, are permitted provided that the following conditions are met:#
# #
# - Redistributions of source code must retain the above copyright notice, #
# this list of conditions and the following disclaimer. #
# #
# - Redistributions in binary form must reproduce the above copyright #
# notice, this list of conditions and the following disclaimer in the #
# documentation and/or other materials provided with the distribution. #
# #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"#
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE #
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE #
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE #
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR #
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF #
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS #
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN #
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) #
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF #
# THE POSSIBILITY OF SUCH DAMAGE. #
# ----------------------------------------------------------------------------#

import logging
from collections import deque
import threading
import time
from liota.dcc_comms.dcc_comms import DCCComms
from liota.dcc_comms.check_connection import checkConnection

log = logging.getLogger(__name__)

class offlineQueue:
def __init__(self, queue_size, comms, conn=None, data_drain_size=1, drop_oldest=True, draining_frequency=0):
"""
:param size: size of the offline_queue, if negative implies infinite.
:param drop_oldest: if True oldest data will be dropped after size of queue is exceeded.
:param comms: comms instance of DCCComms
:param data_drain_size: how many messages will be drained within each draining_frequency secs defined.
:param draining_frequency: frequency with which data will be published after internet connectivity established(like seconds).
"""
if not isinstance(queue_size, int):
log.error("Size is expected of int type.")
raise TypeError("Size is expected of int type.")
if not isinstance(comms, DCCComms):
log.error("DCCComms object is expected.")
raise TypeError("DCCComms object is expected.")
if not isinstance(drop_oldest, bool):
log.error("drop_oldest/newest is expected of bool type.")
raise TypeError("drop_oldest is expected of bool type.")
if not isinstance(draining_frequency, float) and not isinstance(draining_frequency, int):
log.error("draining_frequency is expected of float or int type.")
raise TypeError("draining_frequency is expected of float or int type.")
try:
assert queue_size!=0 and draining_frequency>=0
except AssertionError as e:
log.error("Size can't be zero, draining_frequency can't be negative.")
raise e("Size can't be zero, draining_frequency can't be negative.")
self.size = queue_size
self.drop_oldest = drop_oldest
if (self.size>0 and drop_oldest):
self.d = deque(maxlen=self.size)
else:
self.d = deque()
self.comms = comms
self.data_drain_size = data_drain_size
if conn is None:
self.conn = checkConnection()
else:
self.conn = conn
self.draining_frequency = draining_frequency
self.draining_in_progress = False
self._offlineQLock = threading.Lock()

def append(self, data):
if (self.size<0): #for infinite length deque
self.d.append(data)
elif (self.size>0 and self.drop_oldest): #for deque with drop_oldest=True
if len(self.d) is self.size:
log.info("Message dropped: {}".format(self.d[0]))
self.d.append(data)
else: #for deque with drop_oldest=False
if len(self.d) is self.size:
log.info("Message dropped: {}".format(data))
else:
self.d.append(data)

def _drain(self):
self._offlineQLock.acquire()
data_drained = 0
self.draining_in_progress = True
try:
while self.d:
if self.conn.check:
data = self.d.popleft()
self.comms.send(data)
data_drained+=1
log.info("Data Drain: {}".format(data))
else: #if internet conncetivity breaks while draining
log.warning("Internet broke while draining.")
break
if data_drained==self.data_drain_size: #if some amt. of data drained thread sleeps for specified draining_freqn.
data_drained=0
time.sleep(self.draining_frequency)
except Exception as e:
log.warning("Internet connectivity broke while draining.")
raise e
finally:
self.draining_in_progress = False
self._offlineQLock.release()

def start_drain(self):
queueDrain = threading.Thread(target=self._drain)
queueDrain.daemon = True
queueDrain.start()
152 changes: 152 additions & 0 deletions liota/core/offline_database.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
# -*- coding: utf-8 -*-
# ----------------------------------------------------------------------------#
# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. #
# #
# Licensed under the BSD 2-Clause License (the “License”); you may not use #
# this file except in compliance with the License. #
# #
# The BSD 2-Clause License #
# #
# Redistribution and use in source and binary forms, with or without #
# modification, are permitted provided that the following conditions are met:#
# #
# - Redistributions of source code must retain the above copyright notice, #
# this list of conditions and the following disclaimer. #
# #
# - Redistributions in binary form must reproduce the above copyright #
# notice, this list of conditions and the following disclaimer in the #
# documentation and/or other materials provided with the distribution. #
# #
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"#
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE #
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE #
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE #
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR #
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF #
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS #
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN #
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) #
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF #
# THE POSSIBILITY OF SUCH DAMAGE. #
# ----------------------------------------------------------------------------#

import logging
import sqlite3
import threading
import time
from liota.dcc_comms.dcc_comms import DCCComms
from liota.dcc_comms.check_connection import checkConnection

log = logging.getLogger(__name__)

class offline_database:
def __init__(self, table_name, comms, conn=None, data_drain_size=1, draining_frequency=0):
"""
:param table_name: table_name in which message will be stored
:param comms: comms instance of DCCComms
:param data_drain_size: how many messages will be drained within each draining_frequency secs defined.
:param draining_frequency: frequency with which data will be published after internet connectivity established(like seconds).
"""
if not isinstance(table_name, basestring):
log.error("Table name should be a string.")
raise TypeError("Table name should be a string.")
if not isinstance(comms, DCCComms):
log.error("DCCComms object is expected.")
raise TypeError("DCCComms object is expected.")
if not isinstance(draining_frequency, float) and not isinstance(draining_frequency, int):
log.error("draining_frequency is expected of float or int type.")
raise TypeError("draining_frequency is expected of float or int type.")
try:
assert draining_frequency>=0
except AssertionError as e:
log.error("draining_frequency can't be negative.")
raise e("draining_frequency can't be negative.")
self.table_name = table_name
if conn is None:
self.internet_conn = checkConnection()
else:
self.internet_conn = conn
self.draining_frequency = draining_frequency
self.data_drain_size = data_drain_size
self.comms = comms
self.flag_conn_open = False
self.draining_in_progress = False
self._offline_db_lock = threading.Lock()
self._create_table()

def _create_table(self):
if self.flag_conn_open is False:
self.conn = sqlite3.connect('storage.db')
try:
with self.conn:
if not self.conn.execute("SELECT name FROM sqlite_master WHERE TYPE='table' AND name= ? ", (self.table_name,)).fetchone():
self.conn.text_factory = str
self.flag_conn_open = True
self.cursor = self.conn.cursor()
self.cursor.execute("CREATE TABLE "+self.table_name+" (Message TEXT)")
self.cursor.close()
del self.cursor
else:
log.info("Table already there!!!")
except Exception as e:
raise e
finally:
self.flag_conn_open = False
self.conn.close()

def add(self, message):
try:
self.conn = sqlite3.connect('storage.db')
self.flag_conn_open = True
with self.conn:
self.cursor = self.conn.cursor()
log.info("Adding data to "+ self.table_name)
self.cursor.execute("INSERT INTO "+self.table_name+"(Message) VALUES (?);", (message,))
self.cursor.close()
del self.cursor
except sqlite3.OperationalError as e:
raise e
finally:
self.conn.close()
self.flag_conn_open = False

def _drain(self):
self._offline_db_lock.acquire()
self.conn = sqlite3.connect('storage.db')
self.flag_conn_open = True
self.draining_in_progress = True
self.cursor = self.conn.cursor()
self.del_cursor = self.conn.cursor()
data_drained = 0
try:
for row in self.cursor.execute("SELECT Message FROM "+self.table_name):
if self.comms is not None and self.internet_conn.check :
try:
self.comms.send(row[0])
log.info("Data Drain: {}".format(row[0]))
data_drained+=1
self.del_cursor.execute("Delete from "+self.table_name+" where rowid IN (Select rowid from "+self.table_name+" limit 1);")
self.conn.commit()
except Exception as e:
raise e
else: #internet connectivity breaks while draining
log.warning("Internet broke while draining")
break
if data_drained==self.data_drain_size: #if some amt. of data drained thread sleeps for specified draining_freqn.
data_drained=0
time.sleep(self.draining_frequency)
except Exception as e:
raise e
log.warning("Internet connectivity broke while draining.")
finally:
self.del_cursor.close()
del self.del_cursor
self.conn.close()
self.flag_conn_open = False
self.draining_in_progress = False
self._offline_db_lock.release()

def start_drain(self):
queueDrain = threading.Thread(target=self._drain)
queueDrain.daemon = True
queueDrain.start()
27 changes: 27 additions & 0 deletions liota/dcc_comms/check_connection.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
import os
import threading
import time
from liota.dcc_comms.timeout_exceptions import timeoutException

class checkConnection:
def __init__(self, interval=1, hostname = "8.8.8.8"):
self.interval = interval
self.hostname = hostname
self.check = 1
self.thread = threading.Thread(target=self.run)
self.thread.daemon = True
self.thread.start()

def run(self):
while True:
self.check = self.check_internet()
#z = self.thread.isAlive()
time.sleep(self.interval)

def check_internet(self):
response = os.system("ping -c 1 " + self.hostname + " > /dev/null 2>&1")
if response == 0:
pingstatus = 1
else:
pingstatus = 0
return pingstatus
2 changes: 2 additions & 0 deletions liota/dcc_comms/timeout_exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
class timeoutException(Exception):
pass
4 changes: 2 additions & 2 deletions liota/dccs/aws_iot.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ class AWSIoT(DataCenterComponent):
"""
DCC for AWSIoT Platform.
"""
def __init__(self, con, enclose_metadata=False):
def __init__(self, con, enclose_metadata=False, buffering_params=None):
"""
:param con: DccComms Object
:param enclose_metadata: Include Gateway, Device and Metric names as part of payload or not
"""
super(AWSIoT, self).__init__(
comms=con
comms=con, buffering_params=buffering_params
)
self.enclose_metadata = enclose_metadata

Expand Down
Loading