diff --git a/examples/sampleProp.conf b/examples/sampleProp.conf index 1906d364..1dfc86d4 100644 --- a/examples/sampleProp.conf +++ b/examples/sampleProp.conf @@ -1,4 +1,4 @@ -[DEFAULT] +#### [DEFAULT] #### EdgeSystemName = "EdgeSystem-Name" DeviceName = "Device-Name" @@ -14,6 +14,7 @@ DevicePropList = {"Country":"USA-G", "State":"California", "City":"Palo Alto", " WebSocketUrl = "Websocket-address-url" IotCCUID = "Username" IotCCPassword = "Password" +ShouldUnregisterOnUnload = "False" WebsocketCaCertFile = "/etc/liota/websocket/cacert.pem" VerifyServerCert = True ClientKeyFile = None @@ -23,3 +24,27 @@ ClientCertFile = None GraphiteIP = "Graphite-IP" GraphitePort = None + +#### [GENERICMQTT] #### + +BrokerIP = "Broker Name or IP" +BrokerPort = 8883 +broker_username = "Username" +broker_password = "Password" +cert_required = "CERT_REQUIRED" +tls_version = "PROTOCOL_TLSv1_2" +broker_root_ca_cert = "/etc/liota/mqtt/cacert.pem" +edge_system_cert_file = None +edge_system_key_file = None +cipher = None +protocol = "MQTTv311" +transport = "tcp" +keep_alive = 60 +ConnectDisconnectTimeout = 10 +in_flight = 20 +queue_size = 0 +retry = 5 +CustomPubTopic = "home/gateway/metrics" +LivingRoomTemperatureTopic = "home/living-room/temperature" +LivingRoomHumidityTopic = "home/living-room/humidity" +LivingRoomLightTopic = "home/living-room/light" diff --git a/liota/dccs/influx.py b/liota/dccs/influx.py new file mode 100644 index 00000000..b9a5b9c9 --- /dev/null +++ b/liota/dccs/influx.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +import logging +from liota.dccs.dcc import DataCenterComponent +from liota.entities.metrics.registered_metric import RegisteredMetric +from liota.entities.metrics.metric import Metric +from liota.entities.registered_entity import RegisteredEntity + +log = logging.getLogger(__name__) + +class influx(DataCenterComponent): + def __init__(self, comms): + super(influx, self).__init__( + comms=comms + ) + self.comms = comms + + def register(self, entity_obj): + log.info("Registering resource with influx DCC {0}".format(entity_obj.name)) + if isinstance(entity_obj, Metric): + return RegisteredMetric(entity_obj, self, None) + else: + return RegisteredEntity(entity_obj, self, None) + + def create_relationship(self, reg_entity_parent, reg_entity_child): + reg_entity_child.parent = reg_entity_parent + + def _format_data(self, reg_metric): + met_cnt = reg_metric.values.qsize() + message = '' + host = '' + device_name = '' + metric_name = '' + if met_cnt == 0: + return + for _ in range(met_cnt): + v = reg_metric.values.get(block=True) + if v is not None: + device_name = (reg_metric.parent).ref_entity.name + metric_name = reg_metric.ref_entity.name + if (reg_metric.parent).parent: + host = (reg_metric.parent).parent.ref_entity.entity_id+"."+(reg_metric.parent).ref_entity.entity_id + else: + host = (reg_metric.parent).ref_entity.entity_id #if device is not available, only gateway uuid + + metric_unit = str(reg_metric.ref_entity.unit) + metric_unit = ''.join(metric_unit.split()) + message += '{0},unit={5},host={1} {2}={3} {4}'.format(device_name,host,metric_name,v[1], + v[0]*1000000,metric_unit) + if message == '': + return + log.info ("Publishing values to influx DCC") + log.debug("Device name: {0}".format(device_name)) + log.debug("Metric name: {0}".format(metric_name)) + log.debug("Host name: {0}".format(host)) + log.debug("Formatted message: {0}".format(message)) + return message + + def set_properties(self, reg_entity, properties): + raise NotImplementedError + + def unregister(self, entity_obj): + raise NotImplementedError diff --git a/liota/dccs/opentsdb.py b/liota/dccs/opentsdb.py new file mode 100644 index 00000000..30d83ea1 --- /dev/null +++ b/liota/dccs/opentsdb.py @@ -0,0 +1,87 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +import logging +from liota.dccs.dcc import DataCenterComponent +from liota.entities.metrics.registered_metric import RegisteredMetric +from liota.entities.metrics.metric import Metric +from liota.entities.registered_entity import RegisteredEntity +from datetime import datetime + +log = logging.getLogger(__name__) + +class opentsdb(DataCenterComponent): + def __init__(self, comms): + super(opentsdb, self).__init__( + comms=comms + ) + + def register(self, entity_obj): + log.info("Registering resource with opentsdb DCC {0}".format(entity_obj.name)) + if isinstance(entity_obj, Metric): + return RegisteredMetric(entity_obj, self, None) + else: + return RegisteredEntity(entity_obj, self, None) + + def create_relationship(self, reg_entity_parent, reg_entity_child): + reg_entity_child.parent = reg_entity_parent + + def _format_data(self, reg_metric): + met_cnt = reg_metric.values.qsize() + message = '' + host = '' + device_name = '' + metric_name = '' + if met_cnt == 0: + return + for _ in range(met_cnt): + v = reg_metric.values.get(block=True) + if v is not None: + device_name = (reg_metric.parent).ref_entity.name + metric_name = reg_metric.ref_entity.name + host = (reg_metric.parent).ref_entity.entity_id + + message += 'put {0} {1} {2} host={3}\n'.format(metric_name,v[0],v[1],host) + if message == '': + return + log.info ("Publishing values to opentsdb DCC") + log.debug("Device name: {0}".format(device_name)) + log.debug("Metric name: {0}".format(metric_name)) + log.debug("Host name: {0}".format(host)) + log.debug("Formatted message: {0}".format(message)) + return message + + def set_properties(self, reg_entity, properties): + raise NotImplementedError + + def unregister(self, entity_obj): + raise NotImplementedError diff --git a/liota/dccs/wavefront.py b/liota/dccs/wavefront.py new file mode 100644 index 00000000..1332e227 --- /dev/null +++ b/liota/dccs/wavefront.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +import logging +from liota.dccs.dcc import DataCenterComponent +from liota.entities.metrics.registered_metric import RegisteredMetric +from liota.entities.metrics.metric import Metric +from liota.entities.registered_entity import RegisteredEntity + +log = logging.getLogger(__name__) + +class Wavefront(DataCenterComponent): + def __init__(self, comms): + super(Wavefront, self).__init__( + comms=comms + ) + self.comms = comms + + def register(self, entity_obj): + log.info("Registering resource with Wavefront DCC {0}".format(entity_obj.name)) + if isinstance(entity_obj, Metric): + return RegisteredMetric(entity_obj, self, None) + else: + return RegisteredEntity(entity_obj, self, None) + + def create_relationship(self, reg_entity_parent, reg_entity_child): + reg_entity_child.parent = reg_entity_parent + + def _format_data(self, reg_metric): + met_cnt = reg_metric.values.qsize() + message = '' + host = '' + device_name = '' + metric_name = '' + if met_cnt == 0: + return + for _ in range(met_cnt): + v = reg_metric.values.get(block=True) + if v is not None: + device_name = (reg_metric.parent).ref_entity.name + metric_name = reg_metric.ref_entity.name + if (reg_metric.parent).parent: + host = (reg_metric.parent).parent.ref_entity.entity_id+"."+(reg_metric.parent).ref_entity.entity_id + else: + host = (reg_metric.parent).ref_entity.entity_id #if device is not available, only gateway uuid + + metric_unit = str(reg_metric.ref_entity.unit) + metric_unit = ''.join(metric_unit.split()) + message += '{0},unit={5},host={1} {2}={3} {4}'.format(device_name,host,metric_name,v[1], + v[0]*1000000,metric_unit) + if message == '': + return + log.info ("Publishing values to Wavefront DCC") + log.debug("Device name: {0}".format(device_name)) + log.debug("Metric name: {0}".format(metric_name)) + log.debug("Host name: {0}".format(host)) + log.debug("Formatted message: {0}".format(message)) + return message + + def set_properties(self, reg_entity, properties): + raise NotImplementedError + + def unregister(self, entity_obj): + raise NotImplementedError diff --git a/packages/examples/README.md b/packages/examples/README.md index c32644b8..fb5ee8aa 100644 --- a/packages/examples/README.md +++ b/packages/examples/README.md @@ -1,2 +1,13 @@ Here we put example user packages for developers to refer to when they develop their own packages. + +# OpenTSDB +OpenTSDB is a scalable time series database built on top of Hadoop and HBase. It simplifies the process of storing and analyzing large amounts of time-series data generated by endpoints like sensors or servers. + +# Requirements + - OpenTSDB - One can install OpenTSDB from [here](http://opentsdb.net/docs/build/html/installation.html) or use the docker image provided [here.](http://opentsdb.net/docs/build/html/resources.html#docker-images) + +# Examples +Above OpenTSDB examples are sending metrics data to the opentsdb. You can specify the OpenTSDB-IP and port in the sampleProp.conf file provided in the packages folder. + + diff --git a/packages/examples/mqtt/influx/README.md b/packages/examples/mqtt/influx/README.md new file mode 100644 index 00000000..590dbf71 --- /dev/null +++ b/packages/examples/mqtt/influx/README.md @@ -0,0 +1,14 @@ +# Influx +InfluxDB is a time series database built from the ground up to handle high write and query loads. It is the second piece of the TICK stack. InfluxDB is meant to be used as a backing store for any use case involving large amounts of timestamped data, including DevOps monitoring, application metrics, IoT sensor data, and real-time analytics. +# Requirements + - Mqtt Broker: Setup a mqtt broker which will allow liota to send metrics to the collector agent. + + - Telegraf: Telegraf is a plugin-driven server agent for collecting & reporting metrics. You can setup telegraf from [here.](https://docs.influxdata.com/telegraf/v1.4/) + +- InfluxDB: You can install and configure InfluxDb from [here.](https://docs.influxdata.com/influxdb/v1.3/introduction/) + +# Examples +Above examples are sending metrics data to the mqtt broker, from where you can consume data using telegraf agent which will then pass data to InfluxDB. + +You can also view data being sent to InfluxDB using Chronograf. You can setup chronograf from [here.](https://docs.influxdata.com/chronograf/v1.3/) + diff --git a/packages/examples/mqtt/influx/influx_bike_simulated.py b/packages/examples/mqtt/influx/influx_bike_simulated.py new file mode 100644 index 00000000..723c7fa5 --- /dev/null +++ b/packages/examples/mqtt/influx/influx_bike_simulated.py @@ -0,0 +1,188 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +from liota.core.package_manager import LiotaPackage +from liota.lib.utilities.utility import read_user_config + +dependencies = ["influx", "examples/bike_simulator"] + + +def static_vars(**kwargs): + def decorate(func): + for k in kwargs: + setattr(func, k, kwargs[k]) + return func + return decorate + + +class PackageClass(LiotaPackage): + + def create_udm(self, bike_model): + ureg = bike_model.ureg + + import time + import math + + #------------------------------------------------------------------- + # The following functions operate on physical variables represented + # in pint objects, and returns a pint object, too. + # Decorators provided by the pint library are used to check the + # dimensions of arguments passed to the functions. + + @ureg.check(ureg.rpm, ureg.m) + def get_speed(revolution, radius): + return revolution * radius + + @ureg.check(ureg.m / ureg.sec) + @static_vars(speed_last=None, time_last=None) + def get_acceleration(speed): + t = time.time() + if get_acceleration.time_last is None: + acc = 0 * ureg.m / ureg.sec ** 2 + else: + acc = (speed - get_acceleration.speed_last) / \ + ((t - get_acceleration.time_last) * ureg.sec) + get_acceleration.speed_last = speed + get_acceleration.time_last = t + return acc + + @ureg.check(ureg.m ** 2, ureg.m / ureg.sec, ureg.kg / ureg.m ** 3) + def get_resistance(area, speed, k): + return (k * area * speed ** 2) + + @ureg.check(ureg.kg, ureg.m / ureg.sec ** 2) + def get_force(mass, acceleration): + return mass * acceleration + + @ureg.check(ureg.newton, ureg.m / ureg.sec) + def get_power(force, speed): + return force * speed + + #------------------------------------------------------------------- + # This is a sampling method, which queries the physical model, and + # calls the physical functions to calculate a desired variable. + + def get_bike_speed(): + speed = get_speed( + bike_model.get_revolution(), + bike_model.get_radius_wheel() + ).to(ureg.m / ureg.sec) + return speed.magnitude + + #------------------------------------------------------------------- + # This is a more complex sampling method, which queries the physical + # model. + + def get_bike_power(): + weight_total = bike_model.get_weight_bike() + \ + bike_model.get_weight_rider() + \ + bike_model.get_weight_load() + speed = get_speed( + bike_model.get_revolution(), + bike_model.get_radius_wheel() + ) + power_acceleration = get_power( + get_force( + weight_total, + get_acceleration(speed) + ), + speed + ).to(ureg.watt) + power_gravity = get_power( + get_force( + weight_total, + 9.8 * ureg.m / ureg.sec ** 2 + ), + speed * math.sin(bike_model.get_slope()) + ).to(ureg.watt) + power_resistance = get_power( + get_resistance( + bike_model.get_area(), + speed, + 10 * ureg.kg / ureg.m ** 3 + ).to(ureg.newton), + speed + ).to(ureg.watt) + power = power_acceleration + power_gravity + power_resistance + return power.to(ureg.watt).magnitude + + self.get_bike_speed = get_bike_speed + self.get_bike_power = get_bike_power + + def run(self, registry): + from liota.entities.metrics.metric import Metric + import copy + + # Acquire resources from registry + influx_edge_system = copy.copy(registry.get("influx_edge_system")) + influx = registry.get("influx") + bike_simulator = registry.get("bike_simulator") + influx_bike = influx.register(bike_simulator) + + influx.create_relationship(influx_edge_system, influx_bike) + + config_path = registry.get("package_conf") + config = read_user_config(config_path + '/sampleProp.conf') + + ureg = bike_simulator.ureg + self.create_udm(bike_model=bike_simulator) + + # Create metrics + self.metrics = [] + + metric_name = "speed" + bike_speed = Metric( + name=metric_name, + unit=(ureg.m/ureg.sec), + interval=2, + sampling_function=self.get_bike_speed + ) + reg_bike_speed = influx.register(bike_speed) + influx.create_relationship(influx_bike, reg_bike_speed) + reg_bike_speed.start_collecting() + self.metrics.append(reg_bike_speed) + + metric_name = "power" + bike_power = Metric( + name=metric_name, + unit=ureg.watt, + interval=2, + sampling_function=self.get_bike_power + ) + reg_bike_power = influx.register(bike_power) + influx.create_relationship(influx_bike, reg_bike_power) + reg_bike_power.start_collecting() + self.metrics.append(reg_bike_power) + + def clean_up(self): + for metric in self.metrics: + metric.stop_collecting() diff --git a/packages/examples/mqtt/influx/influx_edge_system_stats.py b/packages/examples/mqtt/influx/influx_edge_system_stats.py new file mode 100644 index 00000000..ec4a481a --- /dev/null +++ b/packages/examples/mqtt/influx/influx_edge_system_stats.py @@ -0,0 +1,141 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +from linux_metrics import cpu_stat, disk_stat, net_stat + + +from liota.core.package_manager import LiotaPackage +from liota.lib.utilities.utility import get_default_network_interface, get_disk_name, read_user_config +from liota.lib.utilities.utility import read_user_config + +dependencies = ["influx"] + +# Getting edge_system's network interface and disk name + +# There are situations where route may not actually return a default route in the +# main routing table, as the default route might be kept in another table. +# Such cases should be handled manually. +network_interface = get_default_network_interface() +# If edge_system has multiple disks, only first disk will be returned. +# Such cases should be handled manually. +disk_name = get_disk_name() + +# --------------------------------------------------------------------------- +# This is a sample application package to publish edge system stats data to +# influx using MQTT protocol as DCC Comms +# User defined methods + + +def read_cpu_procs(): + return cpu_stat.procs_running() + + +def read_cpu_utilization(sample_duration_sec=1): + cpu_pcts = cpu_stat.cpu_percents(sample_duration_sec) + return round((100 - cpu_pcts['idle']), 2) + + +def read_disk_usage_stats(): + return round(disk_stat.disk_reads_writes(disk_name)[0], 2) + + +def read_network_bytes_received(): + return round(net_stat.rx_tx_bytes(network_interface)[0], 2) + + +class PackageClass(LiotaPackage): + def run(self, registry): + import copy + from liota.entities.metrics.metric import Metric + from liota.lib.transports.mqtt import MqttMessagingAttributes + + # Acquire resources from registry + influx_edge_system = copy.copy(registry.get("influx_edge_system")) + influx = registry.get("influx") + + # Get values from configuration file + config_path = registry.get("package_conf") + config = read_user_config(config_path + '/sampleProp.conf') + + # Create metrics + self.metrics = [] + metric_name = "CPU-Utilization" + metric_cpu_utilization = Metric(name=metric_name, + unit=None, + interval=2, + aggregation_size=1, + sampling_function=read_cpu_utilization + ) + reg_metric_cpu_utilization = influx.register(metric_cpu_utilization) + influx.create_relationship(influx_edge_system, reg_metric_cpu_utilization) + reg_metric_cpu_utilization.start_collecting() + self.metrics.append(reg_metric_cpu_utilization) + + metric_name = "CPU-Process" + metric_cpu_procs = Metric(name=metric_name, + unit=None, + interval=2, + aggregation_size=1, + sampling_function=read_cpu_procs + ) + reg_metric_cpu_procs = influx.register(metric_cpu_procs) + influx.create_relationship(influx_edge_system, reg_metric_cpu_procs) + reg_metric_cpu_procs.start_collecting() + self.metrics.append(reg_metric_cpu_procs) + + metric_name = "Disk-Usage-Stats" + metric_disk_usage_stats = Metric(name=metric_name, + unit=None, + interval=2, + aggregation_size=1, + sampling_function=read_disk_usage_stats + ) + reg_metric_disk_usage_stats = influx.register(metric_disk_usage_stats) + influx.create_relationship(influx_edge_system, reg_metric_disk_usage_stats) + reg_metric_disk_usage_stats.start_collecting() + self.metrics.append(reg_metric_disk_usage_stats) + + metric_name = "Network-Bytes-Received" + metric_network_bytes_received = Metric(name=metric_name, + unit=None, + interval=2, + aggregation_size=1, + sampling_function=read_network_bytes_received + ) + reg_metric_network_bytes_received = influx.register(metric_network_bytes_received) + influx.create_relationship(influx_edge_system, reg_metric_network_bytes_received) + reg_metric_network_bytes_received.start_collecting() + self.metrics.append(reg_metric_network_bytes_received) + + def clean_up(self): + for metric in self.metrics: + metric.stop_collecting() diff --git a/packages/examples/mqtt/wavefront/README.md b/packages/examples/mqtt/wavefront/README.md new file mode 100644 index 00000000..d36b3ddd --- /dev/null +++ b/packages/examples/mqtt/wavefront/README.md @@ -0,0 +1,16 @@ +# Wavefront +[![N|Solid](https://docs.wavefront.com/images/wavefront_architecture_lb.png)](https://docs.wavefront.com/images/wavefront_architecture_lb.png) + +Wavefront is a high performance streaming analytics platform designed for monitoring and optimization. The service is unique in its ability to scale to very high data ingestion rates and query loads. +# Requirements + - Mqtt Broker: Setup a mqtt broker which will allow liota to send metrics to the collector agent. + + - Collector Agent: Collector agents collect metrics from monitored systems and send them to the Wavefront proxy. Monitored systems can include hosts, containers, and many different types of applications. Wavefront supports many standard collector agents, including Telegraf, Docker cAdvisor, and others. You can check the integration of collector agent [here.](https://docs.wavefront.com/integrations.html) + +- Wavefront Proxy: The Wavefront proxy allows you to send your data to Wavefront in a secure, fast, and reliable manner. The proxy works with the Wavefront server to ensure end-to-end flow control. + +# Examples +Above examples are sending metrics data to the mqtt broker, from where you can consume data using any collector agent which will then pass data to wavefront proxy to be displayed in wavefront. + +You can learn how to get data into wavefront [here.](https://docs.wavefront.com/tutorial_data_ingestion.html) + diff --git a/packages/examples/mqtt/wavefront/wavefront_bike_simulated.py b/packages/examples/mqtt/wavefront/wavefront_bike_simulated.py new file mode 100644 index 00000000..17d4022c --- /dev/null +++ b/packages/examples/mqtt/wavefront/wavefront_bike_simulated.py @@ -0,0 +1,188 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +from liota.core.package_manager import LiotaPackage +from liota.lib.utilities.utility import read_user_config + +dependencies = ["wavefront", "examples/bike_simulator"] + + +def static_vars(**kwargs): + def decorate(func): + for k in kwargs: + setattr(func, k, kwargs[k]) + return func + return decorate + + +class PackageClass(LiotaPackage): + + def create_udm(self, bike_model): + ureg = bike_model.ureg + + import time + import math + + #------------------------------------------------------------------- + # The following functions operate on physical variables represented + # in pint objects, and returns a pint object, too. + # Decorators provided by the pint library are used to check the + # dimensions of arguments passed to the functions. + + @ureg.check(ureg.rpm, ureg.m) + def get_speed(revolution, radius): + return revolution * radius + + @ureg.check(ureg.m / ureg.sec) + @static_vars(speed_last=None, time_last=None) + def get_acceleration(speed): + t = time.time() + if get_acceleration.time_last is None: + acc = 0 * ureg.m / ureg.sec ** 2 + else: + acc = (speed - get_acceleration.speed_last) / \ + ((t - get_acceleration.time_last) * ureg.sec) + get_acceleration.speed_last = speed + get_acceleration.time_last = t + return acc + + @ureg.check(ureg.m ** 2, ureg.m / ureg.sec, ureg.kg / ureg.m ** 3) + def get_resistance(area, speed, k): + return (k * area * speed ** 2) + + @ureg.check(ureg.kg, ureg.m / ureg.sec ** 2) + def get_force(mass, acceleration): + return mass * acceleration + + @ureg.check(ureg.newton, ureg.m / ureg.sec) + def get_power(force, speed): + return force * speed + + #------------------------------------------------------------------- + # This is a sampling method, which queries the physical model, and + # calls the physical functions to calculate a desired variable. + + def get_bike_speed(): + speed = get_speed( + bike_model.get_revolution(), + bike_model.get_radius_wheel() + ).to(ureg.m / ureg.sec) + return speed.magnitude + + #------------------------------------------------------------------- + # This is a more complex sampling method, which queries the physical + # model. + + def get_bike_power(): + weight_total = bike_model.get_weight_bike() + \ + bike_model.get_weight_rider() + \ + bike_model.get_weight_load() + speed = get_speed( + bike_model.get_revolution(), + bike_model.get_radius_wheel() + ) + power_acceleration = get_power( + get_force( + weight_total, + get_acceleration(speed) + ), + speed + ).to(ureg.watt) + power_gravity = get_power( + get_force( + weight_total, + 9.8 * ureg.m / ureg.sec ** 2 + ), + speed * math.sin(bike_model.get_slope()) + ).to(ureg.watt) + power_resistance = get_power( + get_resistance( + bike_model.get_area(), + speed, + 10 * ureg.kg / ureg.m ** 3 + ).to(ureg.newton), + speed + ).to(ureg.watt) + power = power_acceleration + power_gravity + power_resistance + return power.to(ureg.watt).magnitude + + self.get_bike_speed = get_bike_speed + self.get_bike_power = get_bike_power + + def run(self, registry): + from liota.entities.metrics.metric import Metric + import copy + + # Acquire resources from registry + wavefront_edge_system = copy.copy(registry.get("wavefront_edge_system")) + wavefront = registry.get("wavefront") + bike_simulator = registry.get("bike_simulator") + wavefront_bike = wavefront.register(bike_simulator) + + wavefront.create_relationship(wavefront_edge_system, wavefront_bike) + + config_path = registry.get("package_conf") + config = read_user_config(config_path + '/sampleProp.conf') + + ureg = bike_simulator.ureg + self.create_udm(bike_model=bike_simulator) + + # Create metrics + self.metrics = [] + + metric_name = "speed" + bike_speed = Metric( + name=metric_name, + unit=(ureg.m/ureg.sec), + interval=2, + sampling_function=self.get_bike_speed + ) + reg_bike_speed = wavefront.register(bike_speed) + wavefront.create_relationship(wavefront_bike, reg_bike_speed) + reg_bike_speed.start_collecting() + self.metrics.append(reg_bike_speed) + + metric_name = "power" + bike_power = Metric( + name=metric_name, + unit=ureg.watt, + interval=2, + sampling_function=self.get_bike_power + ) + reg_bike_power = wavefront.register(bike_power) + wavefront.create_relationship(wavefront_bike, reg_bike_power) + reg_bike_power.start_collecting() + self.metrics.append(reg_bike_power) + + def clean_up(self): + for metric in self.metrics: + metric.stop_collecting() diff --git a/packages/examples/mqtt/wavefront/wavefront_edge_system_stats.py b/packages/examples/mqtt/wavefront/wavefront_edge_system_stats.py new file mode 100644 index 00000000..6d8b70ad --- /dev/null +++ b/packages/examples/mqtt/wavefront/wavefront_edge_system_stats.py @@ -0,0 +1,141 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +from linux_metrics import cpu_stat, disk_stat, net_stat + + +from liota.core.package_manager import LiotaPackage +from liota.lib.utilities.utility import get_default_network_interface, get_disk_name, read_user_config +from liota.lib.utilities.utility import read_user_config + +dependencies = ["wavefront"] + +# Getting edge_system's network interface and disk name + +# There are situations where route may not actually return a default route in the +# main routing table, as the default route might be kept in another table. +# Such cases should be handled manually. +network_interface = get_default_network_interface() +# If edge_system has multiple disks, only first disk will be returned. +# Such cases should be handled manually. +disk_name = get_disk_name() + +# --------------------------------------------------------------------------- +# This is a sample application package to publish edge system stats data to +# wavefront using MQTT protocol as DCC Comms +# User defined methods + + +def read_cpu_procs(): + return cpu_stat.procs_running() + + +def read_cpu_utilization(sample_duration_sec=1): + cpu_pcts = cpu_stat.cpu_percents(sample_duration_sec) + return round((100 - cpu_pcts['idle']), 2) + + +def read_disk_usage_stats(): + return round(disk_stat.disk_reads_writes(disk_name)[0], 2) + + +def read_network_bytes_received(): + return round(net_stat.rx_tx_bytes(network_interface)[0], 2) + + +class PackageClass(LiotaPackage): + def run(self, registry): + import copy + from liota.entities.metrics.metric import Metric + from liota.lib.transports.mqtt import MqttMessagingAttributes + + # Acquire resources from registry + wavefront_edge_system = copy.copy(registry.get("wavefront_edge_system")) + wavefront = registry.get("wavefront") + + # Get values from configuration file + config_path = registry.get("package_conf") + config = read_user_config(config_path + '/sampleProp.conf') + + # Create metrics + self.metrics = [] + metric_name = "CPU-Utilization" + metric_cpu_utilization = Metric(name=metric_name, + unit=None, + interval=2, + aggregation_size=1, + sampling_function=read_cpu_utilization + ) + reg_metric_cpu_utilization = wavefront.register(metric_cpu_utilization) + wavefront.create_relationship(wavefront_edge_system, reg_metric_cpu_utilization) + reg_metric_cpu_utilization.start_collecting() + self.metrics.append(reg_metric_cpu_utilization) + + metric_name = "CPU-Process" + metric_cpu_procs = Metric(name=metric_name, + unit=None, + interval=2, + aggregation_size=1, + sampling_function=read_cpu_procs + ) + reg_metric_cpu_procs = wavefront.register(metric_cpu_procs) + wavefront.create_relationship(wavefront_edge_system, reg_metric_cpu_procs) + reg_metric_cpu_procs.start_collecting() + self.metrics.append(reg_metric_cpu_procs) + + metric_name = "Disk-Usage-Stats" + metric_disk_usage_stats = Metric(name=metric_name, + unit=None, + interval=2, + aggregation_size=1, + sampling_function=read_disk_usage_stats + ) + reg_metric_disk_usage_stats = wavefront.register(metric_disk_usage_stats) + wavefront.create_relationship(wavefront_edge_system, reg_metric_disk_usage_stats) + reg_metric_disk_usage_stats.start_collecting() + self.metrics.append(reg_metric_disk_usage_stats) + + metric_name = "Network-Bytes-Received" + metric_network_bytes_received = Metric(name=metric_name, + unit=None, + interval=2, + aggregation_size=1, + sampling_function=read_network_bytes_received + ) + reg_metric_network_bytes_received = wavefront.register(metric_network_bytes_received) + wavefront.create_relationship(wavefront_edge_system, reg_metric_network_bytes_received) + reg_metric_network_bytes_received.start_collecting() + self.metrics.append(reg_metric_network_bytes_received) + + def clean_up(self): + for metric in self.metrics: + metric.stop_collecting() diff --git a/packages/examples/opentsdb_bike_simulated.py b/packages/examples/opentsdb_bike_simulated.py new file mode 100644 index 00000000..4bebc9c5 --- /dev/null +++ b/packages/examples/opentsdb_bike_simulated.py @@ -0,0 +1,180 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +from liota.core.package_manager import LiotaPackage + +dependencies = ["opentsdb", "examples/bike_simulator"] + + +def static_vars(**kwargs): + def decorate(func): + for k in kwargs: + setattr(func, k, kwargs[k]) + return func + return decorate + + +class PackageClass(LiotaPackage): + + def create_udm(self, bike_model): + ureg = bike_model.ureg + + import time + import math + + #------------------------------------------------------------------- + # The following functions operate on physical variables represented + # in pint objects, and returns a pint object, too. + # Decorators provided by the pint library are used to check the + # dimensions of arguments passed to the functions. + + @ureg.check(ureg.rpm, ureg.m) + def get_speed(revolution, radius): + return revolution * radius + + @ureg.check(ureg.m / ureg.sec) + @static_vars(speed_last=None, time_last=None) + def get_acceleration(speed): + t = time.time() + if get_acceleration.time_last is None: + acc = 0 * ureg.m / ureg.sec ** 2 + else: + acc = (speed - get_acceleration.speed_last) / \ + ((t - get_acceleration.time_last) * ureg.sec) + get_acceleration.speed_last = speed + get_acceleration.time_last = t + return acc + + @ureg.check(ureg.m ** 2, ureg.m / ureg.sec, ureg.kg / ureg.m ** 3) + def get_resistance(area, speed, k): + return (k * area * speed ** 2) + + @ureg.check(ureg.kg, ureg.m / ureg.sec ** 2) + def get_force(mass, acceleration): + return mass * acceleration + + @ureg.check(ureg.newton, ureg.m / ureg.sec) + def get_power(force, speed): + return force * speed + + #------------------------------------------------------------------- + # This is a sampling method, which queries the physical model, and + # calls the physical functions to calculate a desired variable. + + def get_bike_speed(): + speed = get_speed( + bike_model.get_revolution(), + bike_model.get_radius_wheel() + ).to(ureg.m / ureg.sec) + return speed.magnitude + + #------------------------------------------------------------------- + # This is a more complex sampling method, which queries the physical + # model. + + def get_bike_power(): + weight_total = bike_model.get_weight_bike() + \ + bike_model.get_weight_rider() + \ + bike_model.get_weight_load() + speed = get_speed( + bike_model.get_revolution(), + bike_model.get_radius_wheel() + ) + power_acceleration = get_power( + get_force( + weight_total, + get_acceleration(speed) + ), + speed + ).to(ureg.watt) + power_gravity = get_power( + get_force( + weight_total, + 9.8 * ureg.m / ureg.sec ** 2 + ), + speed * math.sin(bike_model.get_slope()) + ).to(ureg.watt) + power_resistance = get_power( + get_resistance( + bike_model.get_area(), + speed, + 10 * ureg.kg / ureg.m ** 3 + ).to(ureg.newton), + speed + ).to(ureg.watt) + power = power_acceleration + power_gravity + power_resistance + return power.to(ureg.watt).magnitude + + self.get_bike_speed = get_bike_speed + self.get_bike_power = get_bike_power + + def run(self, registry): + from liota.entities.metrics.metric import Metric + + # Acquire resources from registry + opentsdb = registry.get("opentsdb") + bike_simulator = registry.get("bike_simulator") + opentsdb_bike = opentsdb.register(bike_simulator) + + ureg = bike_simulator.ureg + self.create_udm(bike_model=bike_simulator) + + # Create metrics + self.metrics = [] + + metric_name = "speed" + bike_speed = Metric( + name=metric_name, + unit=(ureg.m / ureg.sec), + interval=5, + sampling_function=self.get_bike_speed + ) + reg_bike_speed = opentsdb.register(bike_speed) + opentsdb.create_relationship(opentsdb_bike, reg_bike_speed) + reg_bike_speed.start_collecting() + self.metrics.append(reg_bike_speed) + + metric_name = "power" + bike_power = Metric( + name=metric_name, + unit=ureg.watt, + interval=5, + sampling_function=self.get_bike_power + ) + reg_bike_power = opentsdb.register(bike_power) + opentsdb.create_relationship(opentsdb_bike, reg_bike_power) + reg_bike_power.start_collecting() + self.metrics.append(reg_bike_power) + + def clean_up(self): + for metric in self.metrics: + metric.stop_collecting() diff --git a/packages/examples/opentsdb_edge_system_stats.py b/packages/examples/opentsdb_edge_system_stats.py new file mode 100644 index 00000000..7eaa9f94 --- /dev/null +++ b/packages/examples/opentsdb_edge_system_stats.py @@ -0,0 +1,141 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +from linux_metrics import cpu_stat, disk_stat, net_stat + + +from liota.core.package_manager import LiotaPackage +from liota.lib.utilities.utility import get_default_network_interface, get_disk_name, read_user_config +from liota.lib.utilities.utility import read_user_config + +dependencies = ["opentsdb"] + +# Getting edge_system's network interface and disk name + +# There are situations where route may not actually return a default route in the +# main routing table, as the default route might be kept in another table. +# Such cases should be handled manually. +network_interface = get_default_network_interface() +# If edge_system has multiple disks, only first disk will be returned. +# Such cases should be handled manually. +disk_name = get_disk_name() + +# --------------------------------------------------------------------------- +# This is a sample application package to publish edge system stats data to +# opentsdb using MQTT protocol as DCC Comms +# User defined methods + + +def read_cpu_procs(): + return cpu_stat.procs_running() + + +def read_cpu_utilization(sample_duration_sec=1): + cpu_pcts = cpu_stat.cpu_percents(sample_duration_sec) + return round((100 - cpu_pcts['idle']), 2) + + +def read_disk_usage_stats(): + return round(disk_stat.disk_reads_writes(disk_name)[0], 2) + + +def read_network_bytes_received(): + return round(net_stat.rx_tx_bytes(network_interface)[0], 2) + + +class PackageClass(LiotaPackage): + def run(self, registry): + import copy + from liota.entities.metrics.metric import Metric + from liota.lib.transports.mqtt import MqttMessagingAttributes + + # Acquire resources from registry + opentsdb_edge_system = copy.copy(registry.get("opentsdb_edge_system")) + opentsdb = registry.get("opentsdb") + + # Get values from configuration file + config_path = registry.get("package_conf") + config = read_user_config(config_path + '/sampleProp.conf') + + # Create metrics + self.metrics = [] + metric_name = "CPU-Utilization" + metric_cpu_utilization = Metric(name=metric_name, + unit=None, + interval=2, + aggregation_size=1, + sampling_function=read_cpu_utilization + ) + reg_metric_cpu_utilization = opentsdb.register(metric_cpu_utilization) + opentsdb.create_relationship(opentsdb_edge_system, reg_metric_cpu_utilization) + reg_metric_cpu_utilization.start_collecting() + self.metrics.append(reg_metric_cpu_utilization) + + metric_name = "CPU-Process" + metric_cpu_procs = Metric(name=metric_name, + unit=None, + interval=2, + aggregation_size=1, + sampling_function=read_cpu_procs + ) + reg_metric_cpu_procs = opentsdb.register(metric_cpu_procs) + opentsdb.create_relationship(opentsdb_edge_system, reg_metric_cpu_procs) + reg_metric_cpu_procs.start_collecting() + self.metrics.append(reg_metric_cpu_procs) + + metric_name = "Disk-Usage-Stats" + metric_disk_usage_stats = Metric(name=metric_name, + unit=None, + interval=2, + aggregation_size=1, + sampling_function=read_disk_usage_stats + ) + reg_metric_disk_usage_stats = opentsdb.register(metric_disk_usage_stats) + opentsdb.create_relationship(opentsdb_edge_system, reg_metric_disk_usage_stats) + reg_metric_disk_usage_stats.start_collecting() + self.metrics.append(reg_metric_disk_usage_stats) + + metric_name = "Network-Bytes-Received" + metric_network_bytes_received = Metric(name=metric_name, + unit=None, + interval=2, + aggregation_size=1, + sampling_function=read_network_bytes_received + ) + reg_metric_network_bytes_received = opentsdb.register(metric_network_bytes_received) + opentsdb.create_relationship(opentsdb_edge_system, reg_metric_network_bytes_received) + reg_metric_network_bytes_received.start_collecting() + self.metrics.append(reg_metric_network_bytes_received) + + def clean_up(self): + for metric in self.metrics: + metric.stop_collecting() diff --git a/packages/influx.py b/packages/influx.py new file mode 100644 index 00000000..61f5f445 --- /dev/null +++ b/packages/influx.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +from liota.core.package_manager import LiotaPackage +from liota.lib.utilities.utility import read_user_config + +dependencies = ["edge_systems/dell5k/edge_system"] + + +class PackageClass(LiotaPackage): + """ + This package creates a Graphite DCC object and registers system on + Graphite to acquire "registered edge system", i.e. graphite_edge_system. + """ + + def run(self, registry): + import copy + from liota.dccs.influx import influx + from liota.dcc_comms.mqtt_dcc_comms import MqttDccComms + from liota.lib.transports.mqtt import MqttMessagingAttributes + from liota.lib.transports.mqtt import QoSDetails + from liota.lib.utilities.identity import Identity + from liota.lib.utilities.tls_conf import TLSConf + + # Acquire resources from registry + # Creating a copy of system object to keep original object "clean" + edge_system = copy.copy(registry.get("edge_system")) + + # Get values from configuration file + config_path = registry.get("package_conf") + config = read_user_config(config_path + '/sampleProp.conf') + + # Initialize DCC object with transport + identity = Identity(root_ca_cert=config['broker_root_ca_cert'], username=config['broker_username'], password=['broker_password'], + cert_file=None, key_file=None) + # Encapsulate TLS parameters + tls_conf = TLSConf(config['cert_required'], config['tls_version'], config['cipher']) + # Encapsulate QoS related parameters + qos_details = QoSDetails(config['in_flight'], config['queue_size'], config['retry']) + + # Connecting to emqtt broker + self.influx = influx(MqttDccComms(edge_system_name=edge_system.name, + url=config['BrokerIP'], port=config['BrokerPort'], identity=identity, + tls_conf=tls_conf, + qos_details=qos_details, + clean_session=True, + protocol=config['protocol'], transport=['transport'], + conn_disconn_timeout=config['ConnectDisconnectTimeout'])) + + # Register gateway system + influx_edge_system = self.influx.register(edge_system) + + registry.register("influx", self.influx) + registry.register("influx_edge_system", influx_edge_system) + + def clean_up(self): + self.influx.comms.client = None diff --git a/packages/opentsdb.py b/packages/opentsdb.py new file mode 100644 index 00000000..4410a1e9 --- /dev/null +++ b/packages/opentsdb.py @@ -0,0 +1,71 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +from liota.core.package_manager import LiotaPackage +from liota.lib.utilities.utility import read_user_config + +dependencies = ["edge_systems/dell5k/edge_system"] + + +class PackageClass(LiotaPackage): + """ + This package creates a opentsdb DCC object and registers system on + opentsdb to acquire "registered edge system", i.e. opentsdb_edge_system. + """ + + def run(self, registry): + import copy + from liota.dccs.opentsdb import opentsdb + from liota.dcc_comms.socket_comms import SocketDccComms + + # Acquire resources from registry + # Creating a copy of system object to keep original object "clean" + edge_system = copy.copy(registry.get("edge_system")) + + # Get values from configuration file + config_path = registry.get("package_conf") + config = read_user_config(config_path + '/sampleProp.conf') + + # Initialize DCC object with transport + self.opentsdb = opentsdb( + SocketDccComms(ip=config['opentsdbIP'], + port=config['opentsdbPort']) + ) + + # Register gateway system + opentsdb_edge_system = self.opentsdb.register(edge_system) + + registry.register("opentsdb", self.opentsdb) + registry.register("opentsdb_edge_system", opentsdb_edge_system) + + def clean_up(self): + self.opentsdb.comms.client.close() diff --git a/packages/sampleProp.conf b/packages/sampleProp.conf index 3c1c1948..70e85d59 100644 --- a/packages/sampleProp.conf +++ b/packages/sampleProp.conf @@ -25,6 +25,11 @@ ClientCertFile = None GraphiteIP = "Graphite-IP" GraphitePort = None +#### [opentsdb] #### + +opentsdbIP = "opentsdb-IP" +opentsdbPort = None + #### [GENERICMQTT] #### BrokerIP = "Broker Name or IP" @@ -48,4 +53,3 @@ CustomPubTopic = "home/gateway/metrics" LivingRoomTemperatureTopic = "home/living-room/temperature" LivingRoomHumidityTopic = "home/living-room/humidity" LivingRoomLightTopic = "home/living-room/light" - diff --git a/packages/wavefront.py b/packages/wavefront.py new file mode 100644 index 00000000..c80b9ab1 --- /dev/null +++ b/packages/wavefront.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +from liota.core.package_manager import LiotaPackage +from liota.lib.utilities.utility import read_user_config + +dependencies = ["edge_systems/dell5k/edge_system"] + + +class PackageClass(LiotaPackage): + """ + This package creates a Graphite DCC object and registers system on + Graphite to acquire "registered edge system", i.e. graphite_edge_system. + """ + + def run(self, registry): + import copy + from liota.dccs.wavefront import Wavefront + from liota.dcc_comms.mqtt_dcc_comms import MqttDccComms + from liota.lib.transports.mqtt import MqttMessagingAttributes + from liota.lib.transports.mqtt import QoSDetails + from liota.lib.utilities.identity import Identity + from liota.lib.utilities.tls_conf import TLSConf + + # Acquire resources from registry + # Creating a copy of system object to keep original object "clean" + edge_system = copy.copy(registry.get("edge_system")) + + # Get values from configuration file + config_path = registry.get("package_conf") + config = read_user_config(config_path + '/sampleProp.conf') + + # Initialize DCC object with transport + identity = Identity(root_ca_cert=config['broker_root_ca_cert'], username=config['broker_username'], password=['broker_password'], + cert_file=None, key_file=None) + # Encapsulate TLS parameters + tls_conf = TLSConf(config['cert_required'], config['tls_version'], config['cipher']) + # Encapsulate QoS related parameters + qos_details = QoSDetails(config['in_flight'], config['queue_size'], config['retry']) + + # Connecting to emqtt broker + self.wavefront = Wavefront(MqttDccComms(edge_system_name=edge_system.name, + url=config['BrokerIP'], port=config['BrokerPort'], identity=identity, + tls_conf=tls_conf, + qos_details=qos_details, + clean_session=True, + protocol=config['protocol'], transport=['transport'], + conn_disconn_timeout=config['ConnectDisconnectTimeout'])) + + # Register gateway system + wavefront_edge_system = self.wavefront.register(edge_system) + + registry.register("wavefront", self.wavefront) + registry.register("wavefront_edge_system", wavefront_edge_system) + + def clean_up(self): + self.wavefront.comms.client = None