From e6f55bedf254b352bcb31bbfde8f7b0b70ea3c18 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 22 Sep 2017 15:49:36 +0530 Subject: [PATCH 01/12] Wavefront DCC and wavefront example --- liota/dccs/wavefront.py | 100 ++++++++++ .../wavefront/wavefront_bike_simulated.py | 188 ++++++++++++++++++ .../wavefront/wavefront_edge_system_stats.py | 141 +++++++++++++ packages/sampleProp.conf | 1 - packages/wavefront.py | 86 ++++++++ 5 files changed, 515 insertions(+), 1 deletion(-) create mode 100644 liota/dccs/wavefront.py create mode 100644 packages/examples/mqtt/wavefront/wavefront_bike_simulated.py create mode 100644 packages/examples/mqtt/wavefront/wavefront_edge_system_stats.py create mode 100644 packages/wavefront.py diff --git a/liota/dccs/wavefront.py b/liota/dccs/wavefront.py new file mode 100644 index 00000000..6ba162b1 --- /dev/null +++ b/liota/dccs/wavefront.py @@ -0,0 +1,100 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +import logging +from liota.dccs.dcc import DataCenterComponent +from liota.entities.metrics.registered_metric import RegisteredMetric +from liota.entities.metrics.metric import Metric +from liota.entities.registered_entity import RegisteredEntity + +log = logging.getLogger(__name__) + +class Wavefront(DataCenterComponent): + def __init__(self, comms): + super(Wavefront, self).__init__( + comms=comms + ) + self.comms = comms + self.check = True + + def register(self, entity_obj): + log.info("Registering resource with Wavefront DCC {0}".format(entity_obj.name)) + if isinstance(entity_obj, Metric): + return RegisteredMetric(entity_obj, self, None) + else: + return RegisteredEntity(entity_obj, self, None) + + def create_relationship(self, reg_entity_parent, reg_entity_child): + #print "parent: ",reg_entity_parent.ref_entity.name,reg_entity_parent.ref_entity.entity_id + #print "child: ",reg_entity_child.ref_entity.name,reg_entity_child.ref_entity.entity_id + reg_entity_child.parent = reg_entity_parent + + def _format_data(self, reg_metric): + met_cnt = reg_metric.values.qsize() + message = '' + host = '' + device_name = '' + metric_name = '' + if met_cnt == 0: + return + for _ in range(met_cnt): + v = reg_metric.values.get(block=True) + if v is not None: + device_name = (reg_metric.parent).ref_entity.name + metric_name = reg_metric.ref_entity.name + if (reg_metric.parent).parent: + host = (reg_metric.parent).parent.ref_entity.entity_id+"."+(reg_metric.parent).ref_entity.entity_id + else: + host = (reg_metric.parent).ref_entity.entity_id #if device is not available, only gateway uuid + + metric_unit = str(reg_metric.ref_entity.unit) + metric_unit = ''.join(metric_unit.split()) + message += '{0},unit={5},host={1} {2}={3} {4}'.format(device_name,host,metric_name,v[1], + v[0]*1000000,metric_unit) + if self.check: + print "Device name: ",device_name + print "Metric name: ",metric_name + print "Host name: ",host + print "Message: ",message + self.check = False + + if message == '': + return + log.info ("Publishing values to Wavefront DCC") + log.debug("Formatted message: {0}".format(message)) + return message + + def set_properties(self, reg_entity, properties): + raise NotImplementedError + + def unregister(self, entity_obj): + raise NotImplementedError diff --git a/packages/examples/mqtt/wavefront/wavefront_bike_simulated.py b/packages/examples/mqtt/wavefront/wavefront_bike_simulated.py new file mode 100644 index 00000000..17d4022c --- /dev/null +++ b/packages/examples/mqtt/wavefront/wavefront_bike_simulated.py @@ -0,0 +1,188 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +from liota.core.package_manager import LiotaPackage +from liota.lib.utilities.utility import read_user_config + +dependencies = ["wavefront", "examples/bike_simulator"] + + +def static_vars(**kwargs): + def decorate(func): + for k in kwargs: + setattr(func, k, kwargs[k]) + return func + return decorate + + +class PackageClass(LiotaPackage): + + def create_udm(self, bike_model): + ureg = bike_model.ureg + + import time + import math + + #------------------------------------------------------------------- + # The following functions operate on physical variables represented + # in pint objects, and returns a pint object, too. + # Decorators provided by the pint library are used to check the + # dimensions of arguments passed to the functions. + + @ureg.check(ureg.rpm, ureg.m) + def get_speed(revolution, radius): + return revolution * radius + + @ureg.check(ureg.m / ureg.sec) + @static_vars(speed_last=None, time_last=None) + def get_acceleration(speed): + t = time.time() + if get_acceleration.time_last is None: + acc = 0 * ureg.m / ureg.sec ** 2 + else: + acc = (speed - get_acceleration.speed_last) / \ + ((t - get_acceleration.time_last) * ureg.sec) + get_acceleration.speed_last = speed + get_acceleration.time_last = t + return acc + + @ureg.check(ureg.m ** 2, ureg.m / ureg.sec, ureg.kg / ureg.m ** 3) + def get_resistance(area, speed, k): + return (k * area * speed ** 2) + + @ureg.check(ureg.kg, ureg.m / ureg.sec ** 2) + def get_force(mass, acceleration): + return mass * acceleration + + @ureg.check(ureg.newton, ureg.m / ureg.sec) + def get_power(force, speed): + return force * speed + + #------------------------------------------------------------------- + # This is a sampling method, which queries the physical model, and + # calls the physical functions to calculate a desired variable. + + def get_bike_speed(): + speed = get_speed( + bike_model.get_revolution(), + bike_model.get_radius_wheel() + ).to(ureg.m / ureg.sec) + return speed.magnitude + + #------------------------------------------------------------------- + # This is a more complex sampling method, which queries the physical + # model. + + def get_bike_power(): + weight_total = bike_model.get_weight_bike() + \ + bike_model.get_weight_rider() + \ + bike_model.get_weight_load() + speed = get_speed( + bike_model.get_revolution(), + bike_model.get_radius_wheel() + ) + power_acceleration = get_power( + get_force( + weight_total, + get_acceleration(speed) + ), + speed + ).to(ureg.watt) + power_gravity = get_power( + get_force( + weight_total, + 9.8 * ureg.m / ureg.sec ** 2 + ), + speed * math.sin(bike_model.get_slope()) + ).to(ureg.watt) + power_resistance = get_power( + get_resistance( + bike_model.get_area(), + speed, + 10 * ureg.kg / ureg.m ** 3 + ).to(ureg.newton), + speed + ).to(ureg.watt) + power = power_acceleration + power_gravity + power_resistance + return power.to(ureg.watt).magnitude + + self.get_bike_speed = get_bike_speed + self.get_bike_power = get_bike_power + + def run(self, registry): + from liota.entities.metrics.metric import Metric + import copy + + # Acquire resources from registry + wavefront_edge_system = copy.copy(registry.get("wavefront_edge_system")) + wavefront = registry.get("wavefront") + bike_simulator = registry.get("bike_simulator") + wavefront_bike = wavefront.register(bike_simulator) + + wavefront.create_relationship(wavefront_edge_system, wavefront_bike) + + config_path = registry.get("package_conf") + config = read_user_config(config_path + '/sampleProp.conf') + + ureg = bike_simulator.ureg + self.create_udm(bike_model=bike_simulator) + + # Create metrics + self.metrics = [] + + metric_name = "speed" + bike_speed = Metric( + name=metric_name, + unit=(ureg.m/ureg.sec), + interval=2, + sampling_function=self.get_bike_speed + ) + reg_bike_speed = wavefront.register(bike_speed) + wavefront.create_relationship(wavefront_bike, reg_bike_speed) + reg_bike_speed.start_collecting() + self.metrics.append(reg_bike_speed) + + metric_name = "power" + bike_power = Metric( + name=metric_name, + unit=ureg.watt, + interval=2, + sampling_function=self.get_bike_power + ) + reg_bike_power = wavefront.register(bike_power) + wavefront.create_relationship(wavefront_bike, reg_bike_power) + reg_bike_power.start_collecting() + self.metrics.append(reg_bike_power) + + def clean_up(self): + for metric in self.metrics: + metric.stop_collecting() diff --git a/packages/examples/mqtt/wavefront/wavefront_edge_system_stats.py b/packages/examples/mqtt/wavefront/wavefront_edge_system_stats.py new file mode 100644 index 00000000..6d8b70ad --- /dev/null +++ b/packages/examples/mqtt/wavefront/wavefront_edge_system_stats.py @@ -0,0 +1,141 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +from linux_metrics import cpu_stat, disk_stat, net_stat + + +from liota.core.package_manager import LiotaPackage +from liota.lib.utilities.utility import get_default_network_interface, get_disk_name, read_user_config +from liota.lib.utilities.utility import read_user_config + +dependencies = ["wavefront"] + +# Getting edge_system's network interface and disk name + +# There are situations where route may not actually return a default route in the +# main routing table, as the default route might be kept in another table. +# Such cases should be handled manually. +network_interface = get_default_network_interface() +# If edge_system has multiple disks, only first disk will be returned. +# Such cases should be handled manually. +disk_name = get_disk_name() + +# --------------------------------------------------------------------------- +# This is a sample application package to publish edge system stats data to +# wavefront using MQTT protocol as DCC Comms +# User defined methods + + +def read_cpu_procs(): + return cpu_stat.procs_running() + + +def read_cpu_utilization(sample_duration_sec=1): + cpu_pcts = cpu_stat.cpu_percents(sample_duration_sec) + return round((100 - cpu_pcts['idle']), 2) + + +def read_disk_usage_stats(): + return round(disk_stat.disk_reads_writes(disk_name)[0], 2) + + +def read_network_bytes_received(): + return round(net_stat.rx_tx_bytes(network_interface)[0], 2) + + +class PackageClass(LiotaPackage): + def run(self, registry): + import copy + from liota.entities.metrics.metric import Metric + from liota.lib.transports.mqtt import MqttMessagingAttributes + + # Acquire resources from registry + wavefront_edge_system = copy.copy(registry.get("wavefront_edge_system")) + wavefront = registry.get("wavefront") + + # Get values from configuration file + config_path = registry.get("package_conf") + config = read_user_config(config_path + '/sampleProp.conf') + + # Create metrics + self.metrics = [] + metric_name = "CPU-Utilization" + metric_cpu_utilization = Metric(name=metric_name, + unit=None, + interval=2, + aggregation_size=1, + sampling_function=read_cpu_utilization + ) + reg_metric_cpu_utilization = wavefront.register(metric_cpu_utilization) + wavefront.create_relationship(wavefront_edge_system, reg_metric_cpu_utilization) + reg_metric_cpu_utilization.start_collecting() + self.metrics.append(reg_metric_cpu_utilization) + + metric_name = "CPU-Process" + metric_cpu_procs = Metric(name=metric_name, + unit=None, + interval=2, + aggregation_size=1, + sampling_function=read_cpu_procs + ) + reg_metric_cpu_procs = wavefront.register(metric_cpu_procs) + wavefront.create_relationship(wavefront_edge_system, reg_metric_cpu_procs) + reg_metric_cpu_procs.start_collecting() + self.metrics.append(reg_metric_cpu_procs) + + metric_name = "Disk-Usage-Stats" + metric_disk_usage_stats = Metric(name=metric_name, + unit=None, + interval=2, + aggregation_size=1, + sampling_function=read_disk_usage_stats + ) + reg_metric_disk_usage_stats = wavefront.register(metric_disk_usage_stats) + wavefront.create_relationship(wavefront_edge_system, reg_metric_disk_usage_stats) + reg_metric_disk_usage_stats.start_collecting() + self.metrics.append(reg_metric_disk_usage_stats) + + metric_name = "Network-Bytes-Received" + metric_network_bytes_received = Metric(name=metric_name, + unit=None, + interval=2, + aggregation_size=1, + sampling_function=read_network_bytes_received + ) + reg_metric_network_bytes_received = wavefront.register(metric_network_bytes_received) + wavefront.create_relationship(wavefront_edge_system, reg_metric_network_bytes_received) + reg_metric_network_bytes_received.start_collecting() + self.metrics.append(reg_metric_network_bytes_received) + + def clean_up(self): + for metric in self.metrics: + metric.stop_collecting() diff --git a/packages/sampleProp.conf b/packages/sampleProp.conf index 3c1c1948..95b2276b 100644 --- a/packages/sampleProp.conf +++ b/packages/sampleProp.conf @@ -48,4 +48,3 @@ CustomPubTopic = "home/gateway/metrics" LivingRoomTemperatureTopic = "home/living-room/temperature" LivingRoomHumidityTopic = "home/living-room/humidity" LivingRoomLightTopic = "home/living-room/light" - diff --git a/packages/wavefront.py b/packages/wavefront.py new file mode 100644 index 00000000..c80b9ab1 --- /dev/null +++ b/packages/wavefront.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +from liota.core.package_manager import LiotaPackage +from liota.lib.utilities.utility import read_user_config + +dependencies = ["edge_systems/dell5k/edge_system"] + + +class PackageClass(LiotaPackage): + """ + This package creates a Graphite DCC object and registers system on + Graphite to acquire "registered edge system", i.e. graphite_edge_system. + """ + + def run(self, registry): + import copy + from liota.dccs.wavefront import Wavefront + from liota.dcc_comms.mqtt_dcc_comms import MqttDccComms + from liota.lib.transports.mqtt import MqttMessagingAttributes + from liota.lib.transports.mqtt import QoSDetails + from liota.lib.utilities.identity import Identity + from liota.lib.utilities.tls_conf import TLSConf + + # Acquire resources from registry + # Creating a copy of system object to keep original object "clean" + edge_system = copy.copy(registry.get("edge_system")) + + # Get values from configuration file + config_path = registry.get("package_conf") + config = read_user_config(config_path + '/sampleProp.conf') + + # Initialize DCC object with transport + identity = Identity(root_ca_cert=config['broker_root_ca_cert'], username=config['broker_username'], password=['broker_password'], + cert_file=None, key_file=None) + # Encapsulate TLS parameters + tls_conf = TLSConf(config['cert_required'], config['tls_version'], config['cipher']) + # Encapsulate QoS related parameters + qos_details = QoSDetails(config['in_flight'], config['queue_size'], config['retry']) + + # Connecting to emqtt broker + self.wavefront = Wavefront(MqttDccComms(edge_system_name=edge_system.name, + url=config['BrokerIP'], port=config['BrokerPort'], identity=identity, + tls_conf=tls_conf, + qos_details=qos_details, + clean_session=True, + protocol=config['protocol'], transport=['transport'], + conn_disconn_timeout=config['ConnectDisconnectTimeout'])) + + # Register gateway system + wavefront_edge_system = self.wavefront.register(edge_system) + + registry.register("wavefront", self.wavefront) + registry.register("wavefront_edge_system", wavefront_edge_system) + + def clean_up(self): + self.wavefront.comms.client = None From f10d975730b785023eedeb2f65236c94a4ccd8d6 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 22 Sep 2017 15:53:35 +0530 Subject: [PATCH 02/12] Wavefront DCC and wavefront example --- liota/dccs/wavefront.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/liota/dccs/wavefront.py b/liota/dccs/wavefront.py index 6ba162b1..878b0cb0 100644 --- a/liota/dccs/wavefront.py +++ b/liota/dccs/wavefront.py @@ -44,7 +44,6 @@ def __init__(self, comms): comms=comms ) self.comms = comms - self.check = True def register(self, entity_obj): log.info("Registering resource with Wavefront DCC {0}".format(entity_obj.name)) @@ -80,16 +79,12 @@ def _format_data(self, reg_metric): metric_unit = ''.join(metric_unit.split()) message += '{0},unit={5},host={1} {2}={3} {4}'.format(device_name,host,metric_name,v[1], v[0]*1000000,metric_unit) - if self.check: - print "Device name: ",device_name - print "Metric name: ",metric_name - print "Host name: ",host - print "Message: ",message - self.check = False - if message == '': return log.info ("Publishing values to Wavefront DCC") + log.debug("Device name: {0}".format(device_name)) + log.debug("Metric name: {0}".format(metric_name)) + log.debug("Host name: {0}".format(host)) log.debug("Formatted message: {0}".format(message)) return message From d193948b1e1a4e8e6506d410b2c6536efb1a9e55 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 22 Sep 2017 16:45:49 +0530 Subject: [PATCH 03/12] Wavefront,Influx DCC and packages for wavefront,influx --- liota/dccs/influx.py | 93 +++++++++ liota/dccs/wavefront.py | 2 - liota/lib/transports/mqtt.py | 1 + .../mqtt/influx/influx_bike_simulated.py | 188 ++++++++++++++++++ .../mqtt/influx/influx_edge_system_stats.py | 141 +++++++++++++ packages/influx.py | 86 ++++++++ 6 files changed, 509 insertions(+), 2 deletions(-) create mode 100644 liota/dccs/influx.py create mode 100644 packages/examples/mqtt/influx/influx_bike_simulated.py create mode 100644 packages/examples/mqtt/influx/influx_edge_system_stats.py create mode 100644 packages/influx.py diff --git a/liota/dccs/influx.py b/liota/dccs/influx.py new file mode 100644 index 00000000..b9a5b9c9 --- /dev/null +++ b/liota/dccs/influx.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +import logging +from liota.dccs.dcc import DataCenterComponent +from liota.entities.metrics.registered_metric import RegisteredMetric +from liota.entities.metrics.metric import Metric +from liota.entities.registered_entity import RegisteredEntity + +log = logging.getLogger(__name__) + +class influx(DataCenterComponent): + def __init__(self, comms): + super(influx, self).__init__( + comms=comms + ) + self.comms = comms + + def register(self, entity_obj): + log.info("Registering resource with influx DCC {0}".format(entity_obj.name)) + if isinstance(entity_obj, Metric): + return RegisteredMetric(entity_obj, self, None) + else: + return RegisteredEntity(entity_obj, self, None) + + def create_relationship(self, reg_entity_parent, reg_entity_child): + reg_entity_child.parent = reg_entity_parent + + def _format_data(self, reg_metric): + met_cnt = reg_metric.values.qsize() + message = '' + host = '' + device_name = '' + metric_name = '' + if met_cnt == 0: + return + for _ in range(met_cnt): + v = reg_metric.values.get(block=True) + if v is not None: + device_name = (reg_metric.parent).ref_entity.name + metric_name = reg_metric.ref_entity.name + if (reg_metric.parent).parent: + host = (reg_metric.parent).parent.ref_entity.entity_id+"."+(reg_metric.parent).ref_entity.entity_id + else: + host = (reg_metric.parent).ref_entity.entity_id #if device is not available, only gateway uuid + + metric_unit = str(reg_metric.ref_entity.unit) + metric_unit = ''.join(metric_unit.split()) + message += '{0},unit={5},host={1} {2}={3} {4}'.format(device_name,host,metric_name,v[1], + v[0]*1000000,metric_unit) + if message == '': + return + log.info ("Publishing values to influx DCC") + log.debug("Device name: {0}".format(device_name)) + log.debug("Metric name: {0}".format(metric_name)) + log.debug("Host name: {0}".format(host)) + log.debug("Formatted message: {0}".format(message)) + return message + + def set_properties(self, reg_entity, properties): + raise NotImplementedError + + def unregister(self, entity_obj): + raise NotImplementedError diff --git a/liota/dccs/wavefront.py b/liota/dccs/wavefront.py index 878b0cb0..1332e227 100644 --- a/liota/dccs/wavefront.py +++ b/liota/dccs/wavefront.py @@ -53,8 +53,6 @@ def register(self, entity_obj): return RegisteredEntity(entity_obj, self, None) def create_relationship(self, reg_entity_parent, reg_entity_child): - #print "parent: ",reg_entity_parent.ref_entity.name,reg_entity_parent.ref_entity.entity_id - #print "child: ",reg_entity_child.ref_entity.name,reg_entity_child.ref_entity.entity_id reg_entity_child.parent = reg_entity_parent def _format_data(self, reg_metric): diff --git a/liota/lib/transports/mqtt.py b/liota/lib/transports/mqtt.py index 6f943224..3224ce89 100644 --- a/liota/lib/transports/mqtt.py +++ b/liota/lib/transports/mqtt.py @@ -305,6 +305,7 @@ def connect_soc(self): self._paho_client.message_retry_set(self.qos_details.retry) # Connect with MQTT Broker + self._paho_client.tls_insecure_set(True) self._paho_client.connect(host=self.url, port=self.port, keepalive=self.keep_alive) # Start network loop to handle auto-reconnect diff --git a/packages/examples/mqtt/influx/influx_bike_simulated.py b/packages/examples/mqtt/influx/influx_bike_simulated.py new file mode 100644 index 00000000..723c7fa5 --- /dev/null +++ b/packages/examples/mqtt/influx/influx_bike_simulated.py @@ -0,0 +1,188 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +from liota.core.package_manager import LiotaPackage +from liota.lib.utilities.utility import read_user_config + +dependencies = ["influx", "examples/bike_simulator"] + + +def static_vars(**kwargs): + def decorate(func): + for k in kwargs: + setattr(func, k, kwargs[k]) + return func + return decorate + + +class PackageClass(LiotaPackage): + + def create_udm(self, bike_model): + ureg = bike_model.ureg + + import time + import math + + #------------------------------------------------------------------- + # The following functions operate on physical variables represented + # in pint objects, and returns a pint object, too. + # Decorators provided by the pint library are used to check the + # dimensions of arguments passed to the functions. + + @ureg.check(ureg.rpm, ureg.m) + def get_speed(revolution, radius): + return revolution * radius + + @ureg.check(ureg.m / ureg.sec) + @static_vars(speed_last=None, time_last=None) + def get_acceleration(speed): + t = time.time() + if get_acceleration.time_last is None: + acc = 0 * ureg.m / ureg.sec ** 2 + else: + acc = (speed - get_acceleration.speed_last) / \ + ((t - get_acceleration.time_last) * ureg.sec) + get_acceleration.speed_last = speed + get_acceleration.time_last = t + return acc + + @ureg.check(ureg.m ** 2, ureg.m / ureg.sec, ureg.kg / ureg.m ** 3) + def get_resistance(area, speed, k): + return (k * area * speed ** 2) + + @ureg.check(ureg.kg, ureg.m / ureg.sec ** 2) + def get_force(mass, acceleration): + return mass * acceleration + + @ureg.check(ureg.newton, ureg.m / ureg.sec) + def get_power(force, speed): + return force * speed + + #------------------------------------------------------------------- + # This is a sampling method, which queries the physical model, and + # calls the physical functions to calculate a desired variable. + + def get_bike_speed(): + speed = get_speed( + bike_model.get_revolution(), + bike_model.get_radius_wheel() + ).to(ureg.m / ureg.sec) + return speed.magnitude + + #------------------------------------------------------------------- + # This is a more complex sampling method, which queries the physical + # model. + + def get_bike_power(): + weight_total = bike_model.get_weight_bike() + \ + bike_model.get_weight_rider() + \ + bike_model.get_weight_load() + speed = get_speed( + bike_model.get_revolution(), + bike_model.get_radius_wheel() + ) + power_acceleration = get_power( + get_force( + weight_total, + get_acceleration(speed) + ), + speed + ).to(ureg.watt) + power_gravity = get_power( + get_force( + weight_total, + 9.8 * ureg.m / ureg.sec ** 2 + ), + speed * math.sin(bike_model.get_slope()) + ).to(ureg.watt) + power_resistance = get_power( + get_resistance( + bike_model.get_area(), + speed, + 10 * ureg.kg / ureg.m ** 3 + ).to(ureg.newton), + speed + ).to(ureg.watt) + power = power_acceleration + power_gravity + power_resistance + return power.to(ureg.watt).magnitude + + self.get_bike_speed = get_bike_speed + self.get_bike_power = get_bike_power + + def run(self, registry): + from liota.entities.metrics.metric import Metric + import copy + + # Acquire resources from registry + influx_edge_system = copy.copy(registry.get("influx_edge_system")) + influx = registry.get("influx") + bike_simulator = registry.get("bike_simulator") + influx_bike = influx.register(bike_simulator) + + influx.create_relationship(influx_edge_system, influx_bike) + + config_path = registry.get("package_conf") + config = read_user_config(config_path + '/sampleProp.conf') + + ureg = bike_simulator.ureg + self.create_udm(bike_model=bike_simulator) + + # Create metrics + self.metrics = [] + + metric_name = "speed" + bike_speed = Metric( + name=metric_name, + unit=(ureg.m/ureg.sec), + interval=2, + sampling_function=self.get_bike_speed + ) + reg_bike_speed = influx.register(bike_speed) + influx.create_relationship(influx_bike, reg_bike_speed) + reg_bike_speed.start_collecting() + self.metrics.append(reg_bike_speed) + + metric_name = "power" + bike_power = Metric( + name=metric_name, + unit=ureg.watt, + interval=2, + sampling_function=self.get_bike_power + ) + reg_bike_power = influx.register(bike_power) + influx.create_relationship(influx_bike, reg_bike_power) + reg_bike_power.start_collecting() + self.metrics.append(reg_bike_power) + + def clean_up(self): + for metric in self.metrics: + metric.stop_collecting() diff --git a/packages/examples/mqtt/influx/influx_edge_system_stats.py b/packages/examples/mqtt/influx/influx_edge_system_stats.py new file mode 100644 index 00000000..ec4a481a --- /dev/null +++ b/packages/examples/mqtt/influx/influx_edge_system_stats.py @@ -0,0 +1,141 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +from linux_metrics import cpu_stat, disk_stat, net_stat + + +from liota.core.package_manager import LiotaPackage +from liota.lib.utilities.utility import get_default_network_interface, get_disk_name, read_user_config +from liota.lib.utilities.utility import read_user_config + +dependencies = ["influx"] + +# Getting edge_system's network interface and disk name + +# There are situations where route may not actually return a default route in the +# main routing table, as the default route might be kept in another table. +# Such cases should be handled manually. +network_interface = get_default_network_interface() +# If edge_system has multiple disks, only first disk will be returned. +# Such cases should be handled manually. +disk_name = get_disk_name() + +# --------------------------------------------------------------------------- +# This is a sample application package to publish edge system stats data to +# influx using MQTT protocol as DCC Comms +# User defined methods + + +def read_cpu_procs(): + return cpu_stat.procs_running() + + +def read_cpu_utilization(sample_duration_sec=1): + cpu_pcts = cpu_stat.cpu_percents(sample_duration_sec) + return round((100 - cpu_pcts['idle']), 2) + + +def read_disk_usage_stats(): + return round(disk_stat.disk_reads_writes(disk_name)[0], 2) + + +def read_network_bytes_received(): + return round(net_stat.rx_tx_bytes(network_interface)[0], 2) + + +class PackageClass(LiotaPackage): + def run(self, registry): + import copy + from liota.entities.metrics.metric import Metric + from liota.lib.transports.mqtt import MqttMessagingAttributes + + # Acquire resources from registry + influx_edge_system = copy.copy(registry.get("influx_edge_system")) + influx = registry.get("influx") + + # Get values from configuration file + config_path = registry.get("package_conf") + config = read_user_config(config_path + '/sampleProp.conf') + + # Create metrics + self.metrics = [] + metric_name = "CPU-Utilization" + metric_cpu_utilization = Metric(name=metric_name, + unit=None, + interval=2, + aggregation_size=1, + sampling_function=read_cpu_utilization + ) + reg_metric_cpu_utilization = influx.register(metric_cpu_utilization) + influx.create_relationship(influx_edge_system, reg_metric_cpu_utilization) + reg_metric_cpu_utilization.start_collecting() + self.metrics.append(reg_metric_cpu_utilization) + + metric_name = "CPU-Process" + metric_cpu_procs = Metric(name=metric_name, + unit=None, + interval=2, + aggregation_size=1, + sampling_function=read_cpu_procs + ) + reg_metric_cpu_procs = influx.register(metric_cpu_procs) + influx.create_relationship(influx_edge_system, reg_metric_cpu_procs) + reg_metric_cpu_procs.start_collecting() + self.metrics.append(reg_metric_cpu_procs) + + metric_name = "Disk-Usage-Stats" + metric_disk_usage_stats = Metric(name=metric_name, + unit=None, + interval=2, + aggregation_size=1, + sampling_function=read_disk_usage_stats + ) + reg_metric_disk_usage_stats = influx.register(metric_disk_usage_stats) + influx.create_relationship(influx_edge_system, reg_metric_disk_usage_stats) + reg_metric_disk_usage_stats.start_collecting() + self.metrics.append(reg_metric_disk_usage_stats) + + metric_name = "Network-Bytes-Received" + metric_network_bytes_received = Metric(name=metric_name, + unit=None, + interval=2, + aggregation_size=1, + sampling_function=read_network_bytes_received + ) + reg_metric_network_bytes_received = influx.register(metric_network_bytes_received) + influx.create_relationship(influx_edge_system, reg_metric_network_bytes_received) + reg_metric_network_bytes_received.start_collecting() + self.metrics.append(reg_metric_network_bytes_received) + + def clean_up(self): + for metric in self.metrics: + metric.stop_collecting() diff --git a/packages/influx.py b/packages/influx.py new file mode 100644 index 00000000..61f5f445 --- /dev/null +++ b/packages/influx.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +from liota.core.package_manager import LiotaPackage +from liota.lib.utilities.utility import read_user_config + +dependencies = ["edge_systems/dell5k/edge_system"] + + +class PackageClass(LiotaPackage): + """ + This package creates a Graphite DCC object and registers system on + Graphite to acquire "registered edge system", i.e. graphite_edge_system. + """ + + def run(self, registry): + import copy + from liota.dccs.influx import influx + from liota.dcc_comms.mqtt_dcc_comms import MqttDccComms + from liota.lib.transports.mqtt import MqttMessagingAttributes + from liota.lib.transports.mqtt import QoSDetails + from liota.lib.utilities.identity import Identity + from liota.lib.utilities.tls_conf import TLSConf + + # Acquire resources from registry + # Creating a copy of system object to keep original object "clean" + edge_system = copy.copy(registry.get("edge_system")) + + # Get values from configuration file + config_path = registry.get("package_conf") + config = read_user_config(config_path + '/sampleProp.conf') + + # Initialize DCC object with transport + identity = Identity(root_ca_cert=config['broker_root_ca_cert'], username=config['broker_username'], password=['broker_password'], + cert_file=None, key_file=None) + # Encapsulate TLS parameters + tls_conf = TLSConf(config['cert_required'], config['tls_version'], config['cipher']) + # Encapsulate QoS related parameters + qos_details = QoSDetails(config['in_flight'], config['queue_size'], config['retry']) + + # Connecting to emqtt broker + self.influx = influx(MqttDccComms(edge_system_name=edge_system.name, + url=config['BrokerIP'], port=config['BrokerPort'], identity=identity, + tls_conf=tls_conf, + qos_details=qos_details, + clean_session=True, + protocol=config['protocol'], transport=['transport'], + conn_disconn_timeout=config['ConnectDisconnectTimeout'])) + + # Register gateway system + influx_edge_system = self.influx.register(edge_system) + + registry.register("influx", self.influx) + registry.register("influx_edge_system", influx_edge_system) + + def clean_up(self): + self.influx.comms.client = None From acd78d4340e141a8c8fa0cccba01c5c3108ed8c5 Mon Sep 17 00:00:00 2001 From: root Date: Fri, 22 Sep 2017 17:22:23 +0530 Subject: [PATCH 04/12] Added readme for Influx and Wavefront --- liota/lib/transports/mqtt.py | 1 - packages/examples/mqtt/influx/Readme.md | 14 ++++++++++++++ packages/examples/mqtt/wavefront/Readme.md | 15 +++++++++++++++ 3 files changed, 29 insertions(+), 1 deletion(-) create mode 100644 packages/examples/mqtt/influx/Readme.md create mode 100644 packages/examples/mqtt/wavefront/Readme.md diff --git a/liota/lib/transports/mqtt.py b/liota/lib/transports/mqtt.py index 3224ce89..6f943224 100644 --- a/liota/lib/transports/mqtt.py +++ b/liota/lib/transports/mqtt.py @@ -305,7 +305,6 @@ def connect_soc(self): self._paho_client.message_retry_set(self.qos_details.retry) # Connect with MQTT Broker - self._paho_client.tls_insecure_set(True) self._paho_client.connect(host=self.url, port=self.port, keepalive=self.keep_alive) # Start network loop to handle auto-reconnect diff --git a/packages/examples/mqtt/influx/Readme.md b/packages/examples/mqtt/influx/Readme.md new file mode 100644 index 00000000..590dbf71 --- /dev/null +++ b/packages/examples/mqtt/influx/Readme.md @@ -0,0 +1,14 @@ +# Influx +InfluxDB is a time series database built from the ground up to handle high write and query loads. It is the second piece of the TICK stack. InfluxDB is meant to be used as a backing store for any use case involving large amounts of timestamped data, including DevOps monitoring, application metrics, IoT sensor data, and real-time analytics. +# Requirements + - Mqtt Broker: Setup a mqtt broker which will allow liota to send metrics to the collector agent. + + - Telegraf: Telegraf is a plugin-driven server agent for collecting & reporting metrics. You can setup telegraf from [here.](https://docs.influxdata.com/telegraf/v1.4/) + +- InfluxDB: You can install and configure InfluxDb from [here.](https://docs.influxdata.com/influxdb/v1.3/introduction/) + +# Examples +Above examples are sending metrics data to the mqtt broker, from where you can consume data using telegraf agent which will then pass data to InfluxDB. + +You can also view data being sent to InfluxDB using Chronograf. You can setup chronograf from [here.](https://docs.influxdata.com/chronograf/v1.3/) + diff --git a/packages/examples/mqtt/wavefront/Readme.md b/packages/examples/mqtt/wavefront/Readme.md new file mode 100644 index 00000000..d37c5327 --- /dev/null +++ b/packages/examples/mqtt/wavefront/Readme.md @@ -0,0 +1,15 @@ +# Wavefront +[![N|Solid](https://docs.wavefront.com/images/wavefront_architecture_lb.png)](https://docs.wavefront.com/images/wavefront_architecture_lb.png) +Wavefront is a high performance streaming analytics platform designed for monitoring and optimization. The service is unique in its ability to scale to very high data ingestion rates and query loads. +# Requirements + - Mqtt Broker: Setup a mqtt broker which will allow liota to send metrics to the collector agent. + + - Collector Agent: Collector agents collect metrics from monitored systems and send them to the Wavefront proxy. Monitored systems can include hosts, containers, and many different types of applications. Wavefront supports many standard collector agents, including Telegraf, Docker cAdvisor, and others. You can check the integration of collector agent [here.](https://docs.wavefront.com/integrations.html) + +- Wavefront Proxy: The Wavefront proxy allows you to send your data to Wavefront in a secure, fast, and reliable manner. The proxy works with the Wavefront server to ensure end-to-end flow control. + +# Examples +Above examples are sending metrics data to the mqtt broker, from where you can consume data using any collector agent which will then pass data to wavefront proxy to be displayed in wavefront. + +You can learn how to get data into wavefront [here.](https://docs.wavefront.com/tutorial_data_ingestion.html) + From 65747361ae43a27f16463d6aeacf92844d6edb9d Mon Sep 17 00:00:00 2001 From: root Date: Fri, 22 Sep 2017 17:31:24 +0530 Subject: [PATCH 05/12] Added readme for Influx and Wavefront --- packages/examples/mqtt/wavefront/Readme.md | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/examples/mqtt/wavefront/Readme.md b/packages/examples/mqtt/wavefront/Readme.md index d37c5327..d36b3ddd 100644 --- a/packages/examples/mqtt/wavefront/Readme.md +++ b/packages/examples/mqtt/wavefront/Readme.md @@ -1,5 +1,6 @@ # Wavefront [![N|Solid](https://docs.wavefront.com/images/wavefront_architecture_lb.png)](https://docs.wavefront.com/images/wavefront_architecture_lb.png) + Wavefront is a high performance streaming analytics platform designed for monitoring and optimization. The service is unique in its ability to scale to very high data ingestion rates and query loads. # Requirements - Mqtt Broker: Setup a mqtt broker which will allow liota to send metrics to the collector agent. From afa162d44e10fc8f975bfddb3a019d34c8c692c9 Mon Sep 17 00:00:00 2001 From: root Date: Mon, 25 Sep 2017 18:33:22 +0530 Subject: [PATCH 06/12] Added opentsdb dcc and packages --- liota/dccs/opentsdb.py | 92 ++++++++++ packages/examples/opentsdb_bike_simulated.py | 180 +++++++++++++++++++ packages/opentsdb.py | 71 ++++++++ 3 files changed, 343 insertions(+) create mode 100644 liota/dccs/opentsdb.py create mode 100644 packages/examples/opentsdb_bike_simulated.py create mode 100644 packages/opentsdb.py diff --git a/liota/dccs/opentsdb.py b/liota/dccs/opentsdb.py new file mode 100644 index 00000000..2d0d111c --- /dev/null +++ b/liota/dccs/opentsdb.py @@ -0,0 +1,92 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +import logging +from liota.dccs.dcc import DataCenterComponent +from liota.entities.metrics.registered_metric import RegisteredMetric +from liota.entities.metrics.metric import Metric +from liota.entities.registered_entity import RegisteredEntity +from datetime import datetime + +log = logging.getLogger(__name__) + +class opentsdb(DataCenterComponent): + def __init__(self, comms): + super(opentsdb, self).__init__( + comms=comms + ) + + def register(self, entity_obj): + log.info("Registering resource with opentsdb DCC {0}".format(entity_obj.name)) + if isinstance(entity_obj, Metric): + return RegisteredMetric(entity_obj, self, None) + else: + return RegisteredEntity(entity_obj, self, None) + + def create_relationship(self, reg_entity_parent, reg_entity_child): + reg_entity_child.parent = reg_entity_parent + + def _format_data(self, reg_metric): + met_cnt = reg_metric.values.qsize() + message = '' + host = '' + device_name = '' + metric_name = '' + if met_cnt == 0: + return + for _ in range(met_cnt): + v = reg_metric.values.get(block=True) + if v is not None: + device_name = (reg_metric.parent).ref_entity.name + metric_name = reg_metric.ref_entity.name + host = (reg_metric.parent).ref_entity.entity_id + + metric_unit = str(reg_metric.ref_entity.unit) + metric_unit = ''.join(metric_unit.split()) + x = long((datetime.now() - datetime(1970, 1, 1)).total_seconds() * 1000) + message += 'put {0} {1} {2} host={3}\n'.format(metric_name,x,v[0],host) + print "host: ",host + print "message: ",message + if message == '': + return + log.info ("Publishing values to opentsdb DCC") + log.debug("Device name: {0}".format(device_name)) + log.debug("Metric name: {0}".format(metric_name)) + log.debug("Host name: {0}".format(host)) + log.debug("Formatted message: {0}".format(message)) + return message + + def set_properties(self, reg_entity, properties): + raise NotImplementedError + + def unregister(self, entity_obj): + raise NotImplementedError diff --git a/packages/examples/opentsdb_bike_simulated.py b/packages/examples/opentsdb_bike_simulated.py new file mode 100644 index 00000000..4bebc9c5 --- /dev/null +++ b/packages/examples/opentsdb_bike_simulated.py @@ -0,0 +1,180 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +from liota.core.package_manager import LiotaPackage + +dependencies = ["opentsdb", "examples/bike_simulator"] + + +def static_vars(**kwargs): + def decorate(func): + for k in kwargs: + setattr(func, k, kwargs[k]) + return func + return decorate + + +class PackageClass(LiotaPackage): + + def create_udm(self, bike_model): + ureg = bike_model.ureg + + import time + import math + + #------------------------------------------------------------------- + # The following functions operate on physical variables represented + # in pint objects, and returns a pint object, too. + # Decorators provided by the pint library are used to check the + # dimensions of arguments passed to the functions. + + @ureg.check(ureg.rpm, ureg.m) + def get_speed(revolution, radius): + return revolution * radius + + @ureg.check(ureg.m / ureg.sec) + @static_vars(speed_last=None, time_last=None) + def get_acceleration(speed): + t = time.time() + if get_acceleration.time_last is None: + acc = 0 * ureg.m / ureg.sec ** 2 + else: + acc = (speed - get_acceleration.speed_last) / \ + ((t - get_acceleration.time_last) * ureg.sec) + get_acceleration.speed_last = speed + get_acceleration.time_last = t + return acc + + @ureg.check(ureg.m ** 2, ureg.m / ureg.sec, ureg.kg / ureg.m ** 3) + def get_resistance(area, speed, k): + return (k * area * speed ** 2) + + @ureg.check(ureg.kg, ureg.m / ureg.sec ** 2) + def get_force(mass, acceleration): + return mass * acceleration + + @ureg.check(ureg.newton, ureg.m / ureg.sec) + def get_power(force, speed): + return force * speed + + #------------------------------------------------------------------- + # This is a sampling method, which queries the physical model, and + # calls the physical functions to calculate a desired variable. + + def get_bike_speed(): + speed = get_speed( + bike_model.get_revolution(), + bike_model.get_radius_wheel() + ).to(ureg.m / ureg.sec) + return speed.magnitude + + #------------------------------------------------------------------- + # This is a more complex sampling method, which queries the physical + # model. + + def get_bike_power(): + weight_total = bike_model.get_weight_bike() + \ + bike_model.get_weight_rider() + \ + bike_model.get_weight_load() + speed = get_speed( + bike_model.get_revolution(), + bike_model.get_radius_wheel() + ) + power_acceleration = get_power( + get_force( + weight_total, + get_acceleration(speed) + ), + speed + ).to(ureg.watt) + power_gravity = get_power( + get_force( + weight_total, + 9.8 * ureg.m / ureg.sec ** 2 + ), + speed * math.sin(bike_model.get_slope()) + ).to(ureg.watt) + power_resistance = get_power( + get_resistance( + bike_model.get_area(), + speed, + 10 * ureg.kg / ureg.m ** 3 + ).to(ureg.newton), + speed + ).to(ureg.watt) + power = power_acceleration + power_gravity + power_resistance + return power.to(ureg.watt).magnitude + + self.get_bike_speed = get_bike_speed + self.get_bike_power = get_bike_power + + def run(self, registry): + from liota.entities.metrics.metric import Metric + + # Acquire resources from registry + opentsdb = registry.get("opentsdb") + bike_simulator = registry.get("bike_simulator") + opentsdb_bike = opentsdb.register(bike_simulator) + + ureg = bike_simulator.ureg + self.create_udm(bike_model=bike_simulator) + + # Create metrics + self.metrics = [] + + metric_name = "speed" + bike_speed = Metric( + name=metric_name, + unit=(ureg.m / ureg.sec), + interval=5, + sampling_function=self.get_bike_speed + ) + reg_bike_speed = opentsdb.register(bike_speed) + opentsdb.create_relationship(opentsdb_bike, reg_bike_speed) + reg_bike_speed.start_collecting() + self.metrics.append(reg_bike_speed) + + metric_name = "power" + bike_power = Metric( + name=metric_name, + unit=ureg.watt, + interval=5, + sampling_function=self.get_bike_power + ) + reg_bike_power = opentsdb.register(bike_power) + opentsdb.create_relationship(opentsdb_bike, reg_bike_power) + reg_bike_power.start_collecting() + self.metrics.append(reg_bike_power) + + def clean_up(self): + for metric in self.metrics: + metric.stop_collecting() diff --git a/packages/opentsdb.py b/packages/opentsdb.py new file mode 100644 index 00000000..4410a1e9 --- /dev/null +++ b/packages/opentsdb.py @@ -0,0 +1,71 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +from liota.core.package_manager import LiotaPackage +from liota.lib.utilities.utility import read_user_config + +dependencies = ["edge_systems/dell5k/edge_system"] + + +class PackageClass(LiotaPackage): + """ + This package creates a opentsdb DCC object and registers system on + opentsdb to acquire "registered edge system", i.e. opentsdb_edge_system. + """ + + def run(self, registry): + import copy + from liota.dccs.opentsdb import opentsdb + from liota.dcc_comms.socket_comms import SocketDccComms + + # Acquire resources from registry + # Creating a copy of system object to keep original object "clean" + edge_system = copy.copy(registry.get("edge_system")) + + # Get values from configuration file + config_path = registry.get("package_conf") + config = read_user_config(config_path + '/sampleProp.conf') + + # Initialize DCC object with transport + self.opentsdb = opentsdb( + SocketDccComms(ip=config['opentsdbIP'], + port=config['opentsdbPort']) + ) + + # Register gateway system + opentsdb_edge_system = self.opentsdb.register(edge_system) + + registry.register("opentsdb", self.opentsdb) + registry.register("opentsdb_edge_system", opentsdb_edge_system) + + def clean_up(self): + self.opentsdb.comms.client.close() From 2ee192e0cda3a1fc30883838eba00f5f580b3cae Mon Sep 17 00:00:00 2001 From: root Date: Wed, 27 Sep 2017 12:40:18 +0530 Subject: [PATCH 07/12] opentsdb update --- liota/dccs/opentsdb.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/liota/dccs/opentsdb.py b/liota/dccs/opentsdb.py index 2d0d111c..34abfadc 100644 --- a/liota/dccs/opentsdb.py +++ b/liota/dccs/opentsdb.py @@ -72,8 +72,8 @@ def _format_data(self, reg_metric): metric_unit = str(reg_metric.ref_entity.unit) metric_unit = ''.join(metric_unit.split()) - x = long((datetime.now() - datetime(1970, 1, 1)).total_seconds() * 1000) - message += 'put {0} {1} {2} host={3}\n'.format(metric_name,x,v[0],host) + x = long((datetime.utcnow() - datetime(1970, 1, 1)).total_seconds() * 1000) + message += 'put {0} {1} {2} host={3}\n'.format(metric_name,x,v[1],host) print "host: ",host print "message: ",message if message == '': From cc05efd1524e4b614e8bf94fdc3c33fc79db3cb8 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 27 Sep 2017 12:43:50 +0530 Subject: [PATCH 08/12] opentsdb update --- packages/sampleProp.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/sampleProp.conf b/packages/sampleProp.conf index 95b2276b..c3fdc49b 100644 --- a/packages/sampleProp.conf +++ b/packages/sampleProp.conf @@ -47,4 +47,4 @@ retry = 5 CustomPubTopic = "home/gateway/metrics" LivingRoomTemperatureTopic = "home/living-room/temperature" LivingRoomHumidityTopic = "home/living-room/humidity" -LivingRoomLightTopic = "home/living-room/light" +LivingRoomLightTopic = "home/living-room/light" \ No newline at end of file From 958563acabef7976f41d146c131057a21210fb6e Mon Sep 17 00:00:00 2001 From: root Date: Wed, 27 Sep 2017 15:48:58 +0530 Subject: [PATCH 09/12] opentsdb update --- liota/dccs/opentsdb.py | 5 +- .../examples/opentsdb_edge_system_stats.py | 141 ++++++++++++++++++ packages/sampleProp.conf | 5 + 3 files changed, 147 insertions(+), 4 deletions(-) create mode 100644 packages/examples/opentsdb_edge_system_stats.py diff --git a/liota/dccs/opentsdb.py b/liota/dccs/opentsdb.py index 34abfadc..76f054d1 100644 --- a/liota/dccs/opentsdb.py +++ b/liota/dccs/opentsdb.py @@ -72,10 +72,7 @@ def _format_data(self, reg_metric): metric_unit = str(reg_metric.ref_entity.unit) metric_unit = ''.join(metric_unit.split()) - x = long((datetime.utcnow() - datetime(1970, 1, 1)).total_seconds() * 1000) - message += 'put {0} {1} {2} host={3}\n'.format(metric_name,x,v[1],host) - print "host: ",host - print "message: ",message + message += 'put {0} {1} {2} host={3}\n'.format(metric_name,v[0],v[1],host) if message == '': return log.info ("Publishing values to opentsdb DCC") diff --git a/packages/examples/opentsdb_edge_system_stats.py b/packages/examples/opentsdb_edge_system_stats.py new file mode 100644 index 00000000..7eaa9f94 --- /dev/null +++ b/packages/examples/opentsdb_edge_system_stats.py @@ -0,0 +1,141 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +from linux_metrics import cpu_stat, disk_stat, net_stat + + +from liota.core.package_manager import LiotaPackage +from liota.lib.utilities.utility import get_default_network_interface, get_disk_name, read_user_config +from liota.lib.utilities.utility import read_user_config + +dependencies = ["opentsdb"] + +# Getting edge_system's network interface and disk name + +# There are situations where route may not actually return a default route in the +# main routing table, as the default route might be kept in another table. +# Such cases should be handled manually. +network_interface = get_default_network_interface() +# If edge_system has multiple disks, only first disk will be returned. +# Such cases should be handled manually. +disk_name = get_disk_name() + +# --------------------------------------------------------------------------- +# This is a sample application package to publish edge system stats data to +# opentsdb using MQTT protocol as DCC Comms +# User defined methods + + +def read_cpu_procs(): + return cpu_stat.procs_running() + + +def read_cpu_utilization(sample_duration_sec=1): + cpu_pcts = cpu_stat.cpu_percents(sample_duration_sec) + return round((100 - cpu_pcts['idle']), 2) + + +def read_disk_usage_stats(): + return round(disk_stat.disk_reads_writes(disk_name)[0], 2) + + +def read_network_bytes_received(): + return round(net_stat.rx_tx_bytes(network_interface)[0], 2) + + +class PackageClass(LiotaPackage): + def run(self, registry): + import copy + from liota.entities.metrics.metric import Metric + from liota.lib.transports.mqtt import MqttMessagingAttributes + + # Acquire resources from registry + opentsdb_edge_system = copy.copy(registry.get("opentsdb_edge_system")) + opentsdb = registry.get("opentsdb") + + # Get values from configuration file + config_path = registry.get("package_conf") + config = read_user_config(config_path + '/sampleProp.conf') + + # Create metrics + self.metrics = [] + metric_name = "CPU-Utilization" + metric_cpu_utilization = Metric(name=metric_name, + unit=None, + interval=2, + aggregation_size=1, + sampling_function=read_cpu_utilization + ) + reg_metric_cpu_utilization = opentsdb.register(metric_cpu_utilization) + opentsdb.create_relationship(opentsdb_edge_system, reg_metric_cpu_utilization) + reg_metric_cpu_utilization.start_collecting() + self.metrics.append(reg_metric_cpu_utilization) + + metric_name = "CPU-Process" + metric_cpu_procs = Metric(name=metric_name, + unit=None, + interval=2, + aggregation_size=1, + sampling_function=read_cpu_procs + ) + reg_metric_cpu_procs = opentsdb.register(metric_cpu_procs) + opentsdb.create_relationship(opentsdb_edge_system, reg_metric_cpu_procs) + reg_metric_cpu_procs.start_collecting() + self.metrics.append(reg_metric_cpu_procs) + + metric_name = "Disk-Usage-Stats" + metric_disk_usage_stats = Metric(name=metric_name, + unit=None, + interval=2, + aggregation_size=1, + sampling_function=read_disk_usage_stats + ) + reg_metric_disk_usage_stats = opentsdb.register(metric_disk_usage_stats) + opentsdb.create_relationship(opentsdb_edge_system, reg_metric_disk_usage_stats) + reg_metric_disk_usage_stats.start_collecting() + self.metrics.append(reg_metric_disk_usage_stats) + + metric_name = "Network-Bytes-Received" + metric_network_bytes_received = Metric(name=metric_name, + unit=None, + interval=2, + aggregation_size=1, + sampling_function=read_network_bytes_received + ) + reg_metric_network_bytes_received = opentsdb.register(metric_network_bytes_received) + opentsdb.create_relationship(opentsdb_edge_system, reg_metric_network_bytes_received) + reg_metric_network_bytes_received.start_collecting() + self.metrics.append(reg_metric_network_bytes_received) + + def clean_up(self): + for metric in self.metrics: + metric.stop_collecting() diff --git a/packages/sampleProp.conf b/packages/sampleProp.conf index c3fdc49b..86b77935 100644 --- a/packages/sampleProp.conf +++ b/packages/sampleProp.conf @@ -25,6 +25,11 @@ ClientCertFile = None GraphiteIP = "Graphite-IP" GraphitePort = None +#### [opentsdb] #### + +opentsdbIP = "opentsdb-IP" +opentsdbPort = None + #### [GENERICMQTT] #### BrokerIP = "Broker Name or IP" From 9edbc564bfb285b8c904cdde493e576f458a3bc5 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 27 Sep 2017 15:50:25 +0530 Subject: [PATCH 10/12] opentsdb update --- liota/dccs/opentsdb.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/liota/dccs/opentsdb.py b/liota/dccs/opentsdb.py index 76f054d1..30d83ea1 100644 --- a/liota/dccs/opentsdb.py +++ b/liota/dccs/opentsdb.py @@ -70,8 +70,6 @@ def _format_data(self, reg_metric): metric_name = reg_metric.ref_entity.name host = (reg_metric.parent).ref_entity.entity_id - metric_unit = str(reg_metric.ref_entity.unit) - metric_unit = ''.join(metric_unit.split()) message += 'put {0} {1} {2} host={3}\n'.format(metric_name,v[0],v[1],host) if message == '': return From 1702fbf4a77732917d3c2d8b5d0f02a13f425b90 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 27 Sep 2017 16:04:08 +0530 Subject: [PATCH 11/12] opentsdb readme added --- packages/examples/README.md | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/packages/examples/README.md b/packages/examples/README.md index c32644b8..fb5ee8aa 100644 --- a/packages/examples/README.md +++ b/packages/examples/README.md @@ -1,2 +1,13 @@ Here we put example user packages for developers to refer to when they develop their own packages. + +# OpenTSDB +OpenTSDB is a scalable time series database built on top of Hadoop and HBase. It simplifies the process of storing and analyzing large amounts of time-series data generated by endpoints like sensors or servers. + +# Requirements + - OpenTSDB - One can install OpenTSDB from [here](http://opentsdb.net/docs/build/html/installation.html) or use the docker image provided [here.](http://opentsdb.net/docs/build/html/resources.html#docker-images) + +# Examples +Above OpenTSDB examples are sending metrics data to the opentsdb. You can specify the OpenTSDB-IP and port in the sampleProp.conf file provided in the packages folder. + + From 2eb083b990fae705e13941b436663780e5cddaa8 Mon Sep 17 00:00:00 2001 From: root Date: Wed, 8 Nov 2017 14:59:46 +0530 Subject: [PATCH 12/12] sampleProp edit --- examples/sampleProp.conf | 27 ++++++++++++++++++- .../mqtt/influx/{Readme.md => README.md} | 0 .../mqtt/wavefront/{Readme.md => README.md} | 0 packages/sampleProp.conf | 2 +- 4 files changed, 27 insertions(+), 2 deletions(-) rename packages/examples/mqtt/influx/{Readme.md => README.md} (100%) rename packages/examples/mqtt/wavefront/{Readme.md => README.md} (100%) diff --git a/examples/sampleProp.conf b/examples/sampleProp.conf index 1906d364..1dfc86d4 100644 --- a/examples/sampleProp.conf +++ b/examples/sampleProp.conf @@ -1,4 +1,4 @@ -[DEFAULT] +#### [DEFAULT] #### EdgeSystemName = "EdgeSystem-Name" DeviceName = "Device-Name" @@ -14,6 +14,7 @@ DevicePropList = {"Country":"USA-G", "State":"California", "City":"Palo Alto", " WebSocketUrl = "Websocket-address-url" IotCCUID = "Username" IotCCPassword = "Password" +ShouldUnregisterOnUnload = "False" WebsocketCaCertFile = "/etc/liota/websocket/cacert.pem" VerifyServerCert = True ClientKeyFile = None @@ -23,3 +24,27 @@ ClientCertFile = None GraphiteIP = "Graphite-IP" GraphitePort = None + +#### [GENERICMQTT] #### + +BrokerIP = "Broker Name or IP" +BrokerPort = 8883 +broker_username = "Username" +broker_password = "Password" +cert_required = "CERT_REQUIRED" +tls_version = "PROTOCOL_TLSv1_2" +broker_root_ca_cert = "/etc/liota/mqtt/cacert.pem" +edge_system_cert_file = None +edge_system_key_file = None +cipher = None +protocol = "MQTTv311" +transport = "tcp" +keep_alive = 60 +ConnectDisconnectTimeout = 10 +in_flight = 20 +queue_size = 0 +retry = 5 +CustomPubTopic = "home/gateway/metrics" +LivingRoomTemperatureTopic = "home/living-room/temperature" +LivingRoomHumidityTopic = "home/living-room/humidity" +LivingRoomLightTopic = "home/living-room/light" diff --git a/packages/examples/mqtt/influx/Readme.md b/packages/examples/mqtt/influx/README.md similarity index 100% rename from packages/examples/mqtt/influx/Readme.md rename to packages/examples/mqtt/influx/README.md diff --git a/packages/examples/mqtt/wavefront/Readme.md b/packages/examples/mqtt/wavefront/README.md similarity index 100% rename from packages/examples/mqtt/wavefront/Readme.md rename to packages/examples/mqtt/wavefront/README.md diff --git a/packages/sampleProp.conf b/packages/sampleProp.conf index 86b77935..70e85d59 100644 --- a/packages/sampleProp.conf +++ b/packages/sampleProp.conf @@ -52,4 +52,4 @@ retry = 5 CustomPubTopic = "home/gateway/metrics" LivingRoomTemperatureTopic = "home/living-room/temperature" LivingRoomHumidityTopic = "home/living-room/humidity" -LivingRoomLightTopic = "home/living-room/light" \ No newline at end of file +LivingRoomLightTopic = "home/living-room/light"