diff --git a/tests/unit/__init__.py b/tests/unit/__init__.py new file mode 100644 index 00000000..4badd4a2 --- /dev/null +++ b/tests/unit/__init__.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# diff --git a/tests/unit/dcc_comms/__init__.py b/tests/unit/dcc_comms/__init__.py new file mode 100644 index 00000000..4badd4a2 --- /dev/null +++ b/tests/unit/dcc_comms/__init__.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# diff --git a/tests/unit/dcc_comms/test_amqp_dcc_comms.py b/tests/unit/dcc_comms/test_amqp_dcc_comms.py new file mode 100644 index 00000000..25610524 --- /dev/null +++ b/tests/unit/dcc_comms/test_amqp_dcc_comms.py @@ -0,0 +1,427 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +import os +import unittest +from ConfigParser import ConfigParser + +import mock + +from liota.dcc_comms.amqp_dcc_comms import * +from liota.entities.edge_systems.dell5k_edge_system import Dell5KEdgeSystem +from liota.lib.utilities.identity import Identity +from liota.lib.utilities.tls_conf import TLSConf +from liota.lib.utilities.utility import read_liota_config +from liota.lib.utilities.utility import systemUUID + + +# Sample function +def sampling_function(): + pass + + +# Callback function +def test_callback(): + pass + + +class AmqpDccCommsTest(unittest.TestCase): + """ + AmqpDccComms unit test cases + """ + + def setUp(self): + """ + Setup all required parameters for AmqpDccComms tests + :return: None + """ + # ConfigParser to parse ini file + self.config = ConfigParser() + self.uuid_file = read_liota_config('UUID_PATH', 'uuid_path') + + # Broker details + self.url = "Broker-IP" + self.port = "Broker-Port" + self.amqp_username = "test" + self.amqp_password = "test" + self.enable_authentication = True + self.transport = "tcp" + self.connection_disconnect_timeout_sec = 2 + + # EdgeSystem name + self.edge_system = Dell5KEdgeSystem("TestGateway") + + # TLS configurations + self.root_ca_cert = "/etc/liota/amqp/conf/ca.crt" + self.client_cert_file = "/etc/liota/amqp/conf/client.crt" + self.client_key_file = "/etc/liota/amqp/conf/client.key" + self.cert_required = "CERT_REQUIRED" + self.tls_version = "PROTOCOL_TLSv1" + self.cipher = None + + # Encapsulate the authentication details + self.identity = Identity(self.root_ca_cert, self.amqp_username, self.amqp_password, + self.client_cert_file, self.client_key_file) + + # Encapsulate TLS parameters + self.tls_conf = TLSConf(self.cert_required, self.tls_version, self.cipher) + + self.send_message = "test-message" + + # Creating messaging attributes + self.amqp_msg_attr = AmqpPublishMessagingAttributes(exchange_name="test_exchange", routing_key=["test"]) + + with mock.patch.object(Amqp, '_init_or_re_init') as mocked_init_or_re_init, \ + mock.patch.object(Amqp, "declare_publish_exchange") as mocked_declare_publish_exchange: + # Instantiate client for DCC communication + self.client = AmqpDccComms(edge_system_name=self.edge_system.name, url=self.url, port=self.port) + + def tearDown(self): + """ + Clean up all parameters used in test cases + :return: None + """ + + # Check path exists + if os.path.exists(self.uuid_file): + try: + # Remove the file + os.remove(self.uuid_file) + except OSError as e: + log.error("Unable to remove UUID file" + str(e)) + + self.config = None + self.uuid_file = None + self.edge_system = None + self.url = None + self.port = None + self.amqp_username = None + self.amqp_password = None + self.enable_authentication = None + self.connection_disconnect_timeout_sec = None + + # EdgeSystem name + self.edge_system = None + + # TLS configurations + self.root_ca_cert = None + self.client_cert_file = None + self.client_key_file = None + self.cert_required = None + self.tls_version = None + self.cipher = None + + # Identity + self.identity = None + + # TLS configurations + self.tls_conf = None + + self.send_message = None + self.client = None + + @mock.patch.object(Amqp, "declare_publish_exchange") + @mock.patch.object(Amqp, "__init__") + def test_init_implementation(self, mocked_init, mocked_declare_publish_exchange): + """ + Test AmqpDccComms implementation without publish messaging attributes. + :param mocked_init: Mocked init from Amqp class + :param mocked_init: Mocked declare_publish_exchange from Amqp class + :return: None + """ + + # Mocked init method + mocked_init.return_value = None + + # Instantiate client for DCC communication + self.amqp_client = AmqpDccComms(edge_system_name=self.edge_system.name, url=self.url, port=self.port, + enable_authentication=self.enable_authentication, + tls_conf=self.tls_conf, identity=self.identity, + connection_timeout_sec=self.connection_disconnect_timeout_sec) + + # Read uuid.ini + self.config.read(self.uuid_file) + + edge_system_name = self.config.get('GATEWAY', 'name') + local_uuid = self.config.get('GATEWAY', 'local-uuid') + + # Compare stored edge system name + self.assertEquals(edge_system_name, self.edge_system.name) + + # Compare stored client-id + self.assertEquals(local_uuid, systemUUID().get_uuid(self.edge_system.name)) + + # Check Amqp init called method call has been made + mocked_init.assert_called() + + # Check declare publish exchange has been called + mocked_declare_publish_exchange.assert_called_with(self.amqp_client.pub_msg_attr) + + @mock.patch.object(Amqp, "declare_publish_exchange") + @mock.patch.object(Amqp, "__init__") + def test_init_implementation_without_msg_attr(self, mocked_init, mocked_declare_publish_exchange): + """ + Test AmqpDccComms implementation without publish messaging attributes. + :param mocked_init: Mocked init from Amqp class + :param mocked_init: Mocked declare_publish_exchange from Amqp class + :return: None + """ + + # Mocked init method + mocked_init.return_value = None + + # Instantiate client for DCC communication + self.amqp_client = AmqpDccComms(edge_system_name=self.edge_system.name, url=self.url, port=self.port, + enable_authentication=self.enable_authentication, + tls_conf=self.tls_conf, identity=self.identity, + connection_timeout_sec=self.connection_disconnect_timeout_sec) + + # Read uuid.ini + self.config.read(self.uuid_file) + + edge_system_name = self.config.get('GATEWAY', 'name') + local_uuid = self.config.get('GATEWAY', 'local-uuid') + + # Compare stored edge system name + self.assertEquals(edge_system_name, self.edge_system.name) + + # Compare stored client-id + self.assertEquals(local_uuid, systemUUID().get_uuid(self.edge_system.name)) + + # Check Amqp init called method call has been made + mocked_init.assert_called_with(self.url, self.port, self.identity, self.tls_conf, self.enable_authentication, + self.connection_disconnect_timeout_sec) + + # Check declare publish exchange has been called + mocked_declare_publish_exchange.assert_called_with(self.amqp_client.pub_msg_attr) + + @mock.patch.object(Amqp, "declare_publish_exchange") + @mock.patch.object(Amqp, "__init__") + def test_init_implementation_with_msg_attr(self, mocked_init, mocked_declare_publish_exchange): + """ + Test AmqpDccComms implementation with publish messaging attributes. + :param mocked_init: Mocked init from Amqp class + :param mocked_init: Mocked declare_publish_exchange from Amqp class + :return: None + """ + + # Mocked init method + mocked_init.return_value = None + + # Check path exists + if os.path.exists(self.uuid_file): + try: + # Remove the file + os.remove(self.uuid_file) + except OSError as e: + log.error("Unable to remove UUID file" + str(e)) + + # Instantiate client for DCC communication + self.amqp_client = AmqpDccComms(edge_system_name=self.edge_system.name, url=self.url, port=self.port, + enable_authentication=self.enable_authentication, + tls_conf=self.tls_conf, identity=self.identity, + amqp_pub_msg_attr=self.amqp_msg_attr, + connection_timeout_sec=self.connection_disconnect_timeout_sec) + + # Check uuid file get created or not + self.assertFalse(os.path.exists(self.uuid_file)) + + # Check Amqp init called method call has been made + mocked_init.assert_called_with(self.url, self.port, self.identity, self.tls_conf, self.enable_authentication, + self.connection_disconnect_timeout_sec) + + # Check declare publish exchange has been called + mocked_declare_publish_exchange.assert_called_with(self.amqp_msg_attr) + + def test_init_validation(self): + """ + Test the validation of init method for Publish messaging attributes + :return: None + """ + + with self.assertRaises(TypeError): + # Pass invalid amqp_pub_msg_attr attribute and check implemetation rasing the TypeError + AmqpDccComms(self.edge_system, self.url, self.port, amqp_pub_msg_attr=True) + + @mock.patch.object(Amqp, "disconnect") + def test_disconnect_implementation(self, mocked_disconnect): + """ + + :param mocked_disconnect: Mocked disconnect from Amqp class + :return: None + """ + + # Call _disconnect method + self.client._disconnect() + + # Check implementation calling the mocked method + mocked_disconnect.assert_called() + + @mock.patch.object(Amqp, "disconnect_consumer") + def test_stop_receiving_implementation(self, mocked_disconnect_consumer): + """ + Test the implementation of stop_receiving + :param mocked_disconnect_consumer: Mocked disconnect_consumer consumer from Amqp class + :return: None + """ + + # Call _disconnect method + self.client.stop_receiving() + + # Check implementation calling the mocked method + mocked_disconnect_consumer.assert_called() + + def test_validation_receive(self): + """ + Test the validation of consume_msg_attr_list attributes from receive method. + :return: None + """ + + with self.assertRaises(TypeError): + # Call receive with invalid params + self.client.receive(True, None) + + # Create Amap consumer messaging attributes + AmqpConsumeMessagingAttributes(exchange_name="test_exchange", + routing_keys=["test"]) + + @mock.patch.object(Amqp, "consume") + def test_implementation_receive(self, mocked_consume): + """ + Test the implementation of consume_msg_attr_list attributes without auto_gen_callback from receive method. + :return: None + """ + + # Create Amap consumer messaging attributes + amqp_consumer_msg_attr = [AmqpConsumeMessagingAttributes(exchange_name="test_exchange", + routing_keys=["test"])] + + # Call receive with invalid params + self.client.receive(amqp_consumer_msg_attr) + + # Check consume called with following params + mocked_consume.assert_called_with(amqp_consumer_msg_attr) + + @mock.patch.object(Amqp, "consume") + def test_implementation_receive(self, mocked_consume): + """ + Test the implementation of consume_msg_attr_list attributes without auto_gen_callback from receive method. + :param mocked_consume: Mocked consume method of Amqp class + :return: + """ + + # Create Amap consumer messaging attributes + amqp_consumer_msg_attr = [AmqpConsumeMessagingAttributes(exchange_name="test_exchange", + routing_keys=["test"])] + + # Call receive with invalid params + self.client.receive(amqp_consumer_msg_attr, test_callback) + + # Check consume called with following params + mocked_consume.assert_called_with(amqp_consumer_msg_attr) + + # Check length of the array + self.assertEquals(2, len(amqp_consumer_msg_attr)) + + @mock.patch.object(Amqp, "publish") + def test_send_without_pub_msg_attr(self, mocked_publish): + """ + Test the implementation of publish without pub_msg_attr + :param mocked_publish: Mocked publish method of Amqp class + :return: None + """ + with mock.patch.object(Amqp, '_init_or_re_init') as mocked_init_or_re_init, \ + mock.patch.object(Amqp, "declare_publish_exchange") as mocked_declare_publish_exchange: + # Instantiate client for DCC communication + self.client = AmqpDccComms(edge_system_name=self.edge_system.name, url=self.url, port=self.port, + enable_authentication=True, tls_conf=self.tls_conf, identity=self.identity, + amqp_pub_msg_attr=self.amqp_msg_attr) + + # Call send method with message + self.client.send(self.send_message) + + # Check publish called with following args + mocked_publish.assert_called_with(self.amqp_msg_attr.exchange_name, self.amqp_msg_attr.routing_key, + self.send_message, self.amqp_msg_attr.properties) + + @mock.patch.object(Amqp, "publish") + def test_send_with_pub_msg_attr(self, mocked_publish): + """ + Test the implementation of publish with pub_msg_attr + :param mocked_publish: Mocked publish method of Amqp class + :return: None + """ + with mock.patch.object(Amqp, '_init_or_re_init') as mocked_init_or_re_init, \ + mock.patch.object(Amqp, "declare_publish_exchange") as mocked_declare_publish_exchange: + # Creating messaging attributes + amqp_msg_attr = AmqpPublishMessagingAttributes(exchange_name="test_exchange_new", + routing_key=["test1", "test2"]) + + # Instantiate client for DCC communication + self.client = AmqpDccComms(edge_system_name=self.edge_system.name, url=self.url, port=self.port, + enable_authentication=True, tls_conf=self.tls_conf, identity=self.identity, + amqp_pub_msg_attr=self.amqp_msg_attr) + + # Call send method with message and amqp_msg_attr + self.client.send(self.send_message, amqp_msg_attr) + + # Check publish called with following args + mocked_publish.assert_called_with(amqp_msg_attr.exchange_name, amqp_msg_attr.routing_key, + self.send_message, amqp_msg_attr.properties) + + @mock.patch.object(Amqp, "publish") + def test_send_with_pub_msg_attr_exchange_none(self, mocked_publish): + """ + Test the implementation of publish with pub_msg_attr and exchange name as None + :param mocked_publish: Mocked publish method of Amqp class + :return: None + """ + with mock.patch.object(Amqp, '_init_or_re_init') as mocked_init_or_re_init, \ + mock.patch.object(Amqp, "declare_publish_exchange") as mocked_declare_publish_exchange: + # Creating messaging attributes + amqp_msg_attr = AmqpPublishMessagingAttributes(routing_key=["test1", "test2"]) + + # Instantiate client for DCC communication + self.client = AmqpDccComms(edge_system_name=self.edge_system.name, url=self.url, port=self.port, + enable_authentication=True, tls_conf=self.tls_conf, identity=self.identity, + amqp_pub_msg_attr=self.amqp_msg_attr) + + # Call send method with message and amqp_msg_attr + self.client.send(self.send_message, amqp_msg_attr) + + # Check publish called with following args + mocked_publish.assert_called_with(self.amqp_msg_attr.exchange_name, amqp_msg_attr.routing_key, + self.send_message, amqp_msg_attr.properties) + + +if __name__ == '__main__': + unittest.main(verbosity=1) diff --git a/tests/unit/dccs/__init__.py b/tests/unit/dccs/__init__.py new file mode 100644 index 00000000..4badd4a2 --- /dev/null +++ b/tests/unit/dccs/__init__.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# diff --git a/tests/unit/dccs/test_rabbitmq.py b/tests/unit/dccs/test_rabbitmq.py new file mode 100644 index 00000000..13119450 --- /dev/null +++ b/tests/unit/dccs/test_rabbitmq.py @@ -0,0 +1,303 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +import unittest + +import mock +import pint + +from liota.dcc_comms.amqp_dcc_comms import * +from liota.dccs.dcc import DataCenterComponent +from liota.dccs.rabbitmq import RabbitMQ +from liota.entities.edge_systems.dell5k_edge_system import Dell5KEdgeSystem +from liota.entities.metrics.metric import Metric +from liota.entities.metrics.registered_metric import RegisteredMetric +from liota.entities.registered_entity import RegisteredEntity + +# Create a pint unit registry +ureg = pint.UnitRegistry() + + +# Sample function +def sampling_function(): + pass + + +# Callback function +def test_callback(): + pass + + +class RabbitMQTest(unittest.TestCase): + """ + RabbitMQ unit test cases + """ + + def setUp(self): + """ + Method to initialise the RabbitMQ parameters. + :return: None + """ + + # EdgeSystem name + self.edge_system = Dell5KEdgeSystem("TestEdgeSystem") + + # Mocking the DataCenterComponent init method + with mock.patch("liota.dccs.rabbitmq.DataCenterComponent.__init__") as mocked_dcc: + self.rabbitmq_client = RabbitMQ("Mocked connection object", enclose_metadata=False) + + def tearDown(self): + """ + Method to cleanup the resource created during the execution of test case. + :return: None + """ + self.edge_system = None + self.rabbitmq_client = None + + def test_implementation(self): + """ + Test the implementation of init method + :return: None + """ + + # Check DataCenterComponent is base class of RabbitMQ + self.assertTrue(issubclass(RabbitMQ, DataCenterComponent)) + + # Check the enclosed_metadata + self.assertFalse(self.rabbitmq_client.enclose_metadata) + + def test_validation_of_comms_parameter(self): + """ + Test case to check the validation of RabbitMQ class for invalid connections object. + :return: None + """ + # Checking whether implementation raising the TypeError Exception for invalid comms object + with self.assertRaises(TypeError): + RabbitMQ("Invalid object") + + def test_implementation_register_entity(self): + """ + Test case to check the implementation of register method of RabbitMQ class for entity registration. + :return: None + """ + + # Register the edge + registered_entity = self.rabbitmq_client.register(self.edge_system) + + # Check the returned object is of the class RegisteredEntity + self.assertIsInstance(registered_entity, RegisteredEntity) + + def test_implementation_register_metric(self): + """ + Test case to check the implementation of register method of RabbitMQ for metric registration. + :return: None + """ + + # Creating test Metric + test_metric = Metric( + name="Test_Metric", + unit=None, + interval=10, + aggregation_size=2, + sampling_function=sampling_function + ) + + registered_metric = self.rabbitmq_client.register(test_metric) + + # Check the returned object is of the class RegisteredMetric + self.assertIsInstance(registered_metric, RegisteredMetric) + + def test_implementation_create_relationship(self): + """ + Test case to test RegisteredEntity as Parent and RegisteredMetric as child. + RegisteredEdgeSystem->RegisteredMetric + :return: None + """ + + # Register the edge + registered_entity = self.rabbitmq_client.register(self.edge_system) + + # Creating test Metric + test_metric = Metric( + name="Test_Metric", + unit=None, + interval=10, + aggregation_size=2, + sampling_function=sampling_function + ) + + registered_metric = self.rabbitmq_client.register(test_metric) + + # Creating the parent-child relationship + self.rabbitmq_client.create_relationship(registered_entity, registered_metric) + + self.assertEqual(registered_metric.parent, registered_entity, "Check the implementation of create_relationship") + + def test_validation_create_relationship_metric_device(self): + """ + Test case to test validation for RegisteredMetric as Parent and RegisteredEntity as child. + RegisteredMetric->RegisteredMetric + :return: None + """ + + # Register the edge + registered_entity = self.rabbitmq_client.register(self.edge_system) + + # Creating test Metric + test_metric = Metric( + name="Test_Metric", + unit=None, + interval=10, + aggregation_size=2, + sampling_function=sampling_function + ) + + registered_metric = self.rabbitmq_client.register(test_metric) + + with self.assertRaises(TypeError): + # Test case to check validation for RegisteredMetric as Parent and RegisteredEntity as child. + self.rabbitmq_client.create_relationship(registered_metric, registered_entity) + + def test_validation_create_relationship_child_entity(self): + """ + Test case to check validation for RegisteredEntity as Parent and Child. + RegisteredEdgeSystem->RegisteredEdgeSystem. + :return: None + """ + + # Register the edge + registered_entity = self.rabbitmq_client.register(self.edge_system) + + with self.assertRaises(TypeError): + # Creating the parent-child relationship between Edge-System and Edge-System + self.rabbitmq_client.create_relationship(registered_entity, registered_entity) + + @mock.patch("liota.lib.utilities.dcc_utility.get_formatted_data") + def test_format_data(self, mocked_get_formatted_data=None): + """ + Test the implementation of _format_data. + :param mocked_get_formatted_data: Mocked get_formatted_data from dcc_utilities + :return: None + """ + + # Assign return value to get_formatted_data + mocked_get_formatted_data.return_value = None + + # Creating test Metric + test_metric = Metric( + name="Test_Metric", + unit=None, + interval=10, + aggregation_size=2, + sampling_function=sampling_function + ) + + # Mock the get_formatted_data + with mock.patch("liota.dccs.rabbitmq.get_formatted_data") as mocked_get_formatted_data: + + registered_metric = self.rabbitmq_client.register(test_metric) + + # Call _format_data + self.rabbitmq_client._format_data(reg_metric=registered_metric) + + # Check get_formatted_data called with following + mocked_get_formatted_data.assert_called_with(registered_metric, False) + + def test_consume_implementation(self): + """ + Test the consumer method implementation. + :return: None + """ + + # Create Amap consumer messaging attributes + amqp_consumer_msg_attr = AmqpConsumeMessagingAttributes(exchange_name="test_exchange", + routing_keys=["test"]) + + # Create amqp client + with mock.patch.object(AmqpDccComms, "__init__") as mocked_amqp_init, \ + mock.patch.object(AmqpDccComms, "receive") as mocked_amqp_consume: + + mocked_amqp_init.return_value = None + + # Create mocked AmqpDCCComms object + amqp_client = AmqpDccComms() + + # Create RabbitMQ client object + self.rabbitmq_client = RabbitMQ(amqp_client) + + # Call consume method + self.rabbitmq_client.consume(amqp_consumer_msg_attr, test_callback) + + # Check mocked receive called with following params + mocked_amqp_consume.assert_called_with(amqp_consumer_msg_attr, test_callback) + + def test_stop_consumers(self): + """ + Test the stop_consumers implementation. + :return: None + """ + + # Create amqp client + with mock.patch.object(AmqpDccComms, "__init__") as mocked_amqp_init, \ + mock.patch.object(AmqpDccComms, "stop_receiving") as mocked_amqp_stop_receiving: + mocked_amqp_init.return_value = None + + # Create mocked AmqpDCCComms object + amqp_client = AmqpDccComms() + + # Create RabbitMQ client object + self.rabbitmq_client = RabbitMQ(amqp_client) + + # Call stop_consumers + self.rabbitmq_client.stop_consumers() + + # Call stop_receiving + mocked_amqp_stop_receiving.assert_called() + + def test_set_properties(self): + """ + Test the set_properties implementation. + :return: None + """ + # Check set_properties raising the NotImplementedError + self.assertRaises(NotImplementedError, self.rabbitmq_client.set_properties, None, None) + + def test_unregister(self): + """ + Test the unregister implementation. + :return: None + """ + # Check set_properties raising the NotImplementedError + self.assertRaises(NotImplementedError, self.rabbitmq_client.unregister, None) + +if __name__ == '__main__': + unittest.main(verbosity=1) diff --git a/tests/unit/lib/__init__.py b/tests/unit/lib/__init__.py new file mode 100644 index 00000000..4badd4a2 --- /dev/null +++ b/tests/unit/lib/__init__.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# diff --git a/tests/unit/lib/utilities/__init__.py b/tests/unit/lib/utilities/__init__.py new file mode 100644 index 00000000..4badd4a2 --- /dev/null +++ b/tests/unit/lib/utilities/__init__.py @@ -0,0 +1,31 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# diff --git a/tests/unit/lib/utilities/test_dcc_utilities.py b/tests/unit/lib/utilities/test_dcc_utilities.py new file mode 100644 index 00000000..6842d144 --- /dev/null +++ b/tests/unit/lib/utilities/test_dcc_utilities.py @@ -0,0 +1,459 @@ +# -*- coding: utf-8 -*- +# ----------------------------------------------------------------------------# +# Copyright © 2015-2016 VMware, Inc. All Rights Reserved. # +# # +# Licensed under the BSD 2-Clause License (the “License”); you may not use # +# this file except in compliance with the License. # +# # +# The BSD 2-Clause License # +# # +# Redistribution and use in source and binary forms, with or without # +# modification, are permitted provided that the following conditions are met:# +# # +# - Redistributions of source code must retain the above copyright notice, # +# this list of conditions and the following disclaimer. # +# # +# - Redistributions in binary form must reproduce the above copyright # +# notice, this list of conditions and the following disclaimer in the # +# documentation and/or other materials provided with the distribution. # +# # +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"# +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE # +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF # +# THE POSSIBILITY OF SUCH DAMAGE. # +# ----------------------------------------------------------------------------# + +import unittest + +import mock +import pint + +from liota.dccs.aws_iot import AWSIoT +from liota.entities.devices.simulated_device import SimulatedDevice +from liota.entities.edge_systems.dell5k_edge_system import Dell5KEdgeSystem +from liota.entities.metrics.metric import Metric +from liota.lib.utilities.dcc_utility import * +from liota.lib.utilities.si_unit import parse_unit, UnsupportedUnitError +from liota.lib.utilities.utility import getUTCmillis + + +# Sampling function +def sampling_function(): + pass + + +def raise_exception(*args, **kwargs): + """ + Method to raise the UnsupportedUnitError + :return: None + """ + raise UnsupportedUnitError("UnsupportedUnitError") + + +# Create a pint unit registry +ureg = pint.UnitRegistry() + + +def validate_json(obj): + """ + Method to sort the provided json and returns back the sorted list representation of the json. + :param obj: json object + :return: sorted list of json + """ + if isinstance(obj, dict): + return sorted((k, validate_json(v)) for k, v in obj.items()) + if isinstance(obj, list): + return sorted(validate_json(x) for x in obj) + else: + return obj + + +class DCCUtilityTest(unittest.TestCase): + """ + Unit test cases for DCC Utility global functions + """ + + def setUp(self): + """ + Setup all required parameters required during the tests + :return: None + """ + # EdgeSystem name + self.edge_system = Dell5KEdgeSystem("TestEdgeSystem") + + # Mocking the DataCenterComponent init method + with mock.patch("liota.dccs.aws_iot.DataCenterComponent.__init__") as mocked_dcc: + self.aws = AWSIoT("Mocked DCC", enclose_metadata=True) + + def tearDown(self): + """ + Clean up all parameters used in test cases + :return: None + """ + self.edge_system = Dell5KEdgeSystem("TestEdgeSystem") + self.aws = None + + def test_implementation_get_formatted_data_no_data(self): + """ + Test case to check the implementation of _format_data for empty metric data. + :return: None + """ + + # Mocking the DataCenterComponent init method + with mock.patch("liota.dccs.aws_iot.DataCenterComponent.__init__") as mocked_dcc: + self.aws = AWSIoT("Mocked DCC", enclose_metadata=False) + + # Register the edge + registered_entity = self.aws.register(self.edge_system) + + # Creating test Metric + test_metric = Metric( + name="Test_Metric", + unit=None, + interval=10, + aggregation_size=2, + sampling_function=sampling_function + ) + + registered_metric = self.aws.register(test_metric) + + # Creating the parent-child relationship + self.aws.create_relationship(registered_entity, registered_metric) + + # Getting the parent child relationship array from registered metric + # formatted_data = self.aws._format_data(registered_metric) + formatted_data = get_formatted_data(registered_metric, False) + + # Check two dicts are equal or not + self.assertEqual(None, formatted_data, "Check implementation of get_formatted_data") + + def test_implementation_get_formatted_data_with_enclose_metadata(self): + """ + Test case to check the implementation of get_formatted_data method with enclose_metadata option of AWSIoT class. + RegisteredEdgeSystem->RegisteredMetric + :return: None + """ + + # Register the edge + registered_entity = self.aws.register(self.edge_system) + + # Creating test Metric + test_metric = Metric( + name="Test_Metric", + unit=None, + interval=10, + aggregation_size=2, + sampling_function=sampling_function + ) + + registered_metric = self.aws.register(test_metric) + + # Creating the parent-child relationship + self.aws.create_relationship(registered_entity, registered_metric) + + timestamp = getUTCmillis() + + registered_metric.values.put((timestamp, 10)) + + expected_output = { + "edge_system_name": registered_entity.ref_entity.name, + "metric_name": registered_metric.ref_entity.name, + "metric_data": [{ + "value": 10, + "timestamp": timestamp + }], + "unit": "null" + } + + # Getting the parent child relationship array from registered metric + formatted_data = get_formatted_data(registered_metric, True) + + formatted_json_data = json.loads(formatted_data) + + # Check two dicts are equal or not + self.assertEqual(validate_json(formatted_json_data) == validate_json(expected_output), True, + "Check implementation of get_formatted_data") + + def test_implementation_get_formatted_data_without_enclose_metatadata(self): + """ + Test case to check the output given by get_formatted_data method without enclosed meta_data option. + RegisteredEdgeSystem->RegisteredMetric + :return: None + """ + + # Mocking the DataCenterComponent init method + with mock.patch("liota.dccs.aws_iot.DataCenterComponent.__init__") as mocked_dcc: + self.aws = AWSIoT("Mocked DCC", enclose_metadata=False) + + # Register the edge + registered_entity = self.aws.register(self.edge_system) + + # Creating test Metric + test_metric = Metric( + name="Test_Metric", + unit=None, + interval=10, + aggregation_size=2, + sampling_function=sampling_function + ) + + registered_metric = self.aws.register(test_metric) + + # Creating the parent-child relationship + self.aws.create_relationship(registered_entity, registered_metric) + + # Get current timestamp + timestamp = getUTCmillis() + + registered_metric.values.put((timestamp, 10)) + + # Expected output without enclosed metadata + expected_output = { + "metric_name": registered_metric.ref_entity.name, + "metric_data": [{ + "value": 10, + "timestamp": timestamp + }], + "unit": "null" + } + + # Getting the parent child relationship array from registered metric + formatted_data = get_formatted_data(registered_metric, False) + + # Convert json string to dict for the comparision + formatted_json_data = json.loads(formatted_data) + + # Check two dicts are equal or not + self.assertEqual(validate_json(formatted_json_data) == validate_json(expected_output), True, + "Check implementation of get_formatted_data") + + def test_implementation_get_formatted_data_with_enclose_metadata_device(self): + """ + Test case to test the implementation of get_formatted_data method with enclose_metadata option. + RegisteredEdgeSystem->RegisteredDevice->RegisteredMetric + :return: None + """ + + # Register the edge + registered_entity = self.aws.register(self.edge_system) + + # Creating Simulated Device + test_sensor = SimulatedDevice("TestSensor") + + # Registering Device and creating Parent-Child relationship + reg_test_sensor = self.aws.register(test_sensor) + + self.aws.create_relationship(registered_entity, reg_test_sensor) + + # Creating test Metric + test_metric = Metric( + name="Test_Metric", + unit=ureg.degC, + interval=10, + aggregation_size=2, + sampling_function=sampling_function + ) + + registered_metric = self.aws.register(test_metric) + + # Creating the parent-child relationship + self.aws.create_relationship(reg_test_sensor, registered_metric) + + # Get current timestamp + timestamp = getUTCmillis() + + registered_metric.values.put((timestamp, 10)) + + # Expected output without enclosed metadata + expected_output = { + "edge_system_name": registered_entity.ref_entity.name, + "metric_name": registered_metric.ref_entity.name, + "device_name": reg_test_sensor.ref_entity.name, + "metric_data": [{ + "value": 10, + "timestamp": timestamp + }], + "unit": "null" + } + + unit_tuple = parse_unit(ureg.degC) + + if unit_tuple[0] is None: + # Base and Derived Units + expected_output['unit'] = unit_tuple[1] + else: + # Prefixed or non-SI Units + expected_output['unit'] = unit_tuple[0] + unit_tuple[1] + + # Getting the parent child relationship array from registered metric + formatted_data = get_formatted_data(registered_metric, True) + + # Convert json string to dict for the comparision + formatted_json_data = json.loads(formatted_data) + + # Check two dicts are equal or not + self.assertEqual(validate_json(formatted_json_data) == validate_json(expected_output), True, + "Check implementation of get_formatted_data") + + def test_implementation_get_formatted_data_with_enclose_metadata_device_exception_flow(self): + """ + Test case to test the implementation of get_formatted_data method with enclose_metadata option. + RegisteredEdgeSystem->RegisteredDevice->RegisteredMetric + :return: None + """ + + # Mock the get_formatted_data + with mock.patch("liota.lib.utilities.dcc_utility.parse_unit") as mocked_parse_unit: + # Method to be invoked on calling the parse_unit function + mocked_parse_unit.side_effect = raise_exception + + # Register the edge + registered_entity = self.aws.register(self.edge_system) + + # Creating Simulated Device + test_sensor = SimulatedDevice("TestSensor") + + # Registering Device and creating Parent-Child relationship + reg_test_sensor = self.aws.register(test_sensor) + + self.aws.create_relationship(registered_entity, reg_test_sensor) + + # Creating test Metric + test_metric = Metric( + name="Test_Metric", + unit=ureg.degC, + interval=10, + aggregation_size=2, + sampling_function=sampling_function + ) + + registered_metric = self.aws.register(test_metric) + + # Creating the parent-child relationship + self.aws.create_relationship(reg_test_sensor, registered_metric) + + # Get current timestamp + timestamp = getUTCmillis() + + registered_metric.values.put((timestamp, 10)) + + # Expected output without enclosed metadata + expected_output = { + "edge_system_name": registered_entity.ref_entity.name, + "metric_name": registered_metric.ref_entity.name, + "device_name": reg_test_sensor.ref_entity.name, + "metric_data": [{ + "value": 10, + "timestamp": timestamp + }], + "unit": "null" + } + + # Getting the parent child relationship array from registered metric + formatted_data = get_formatted_data(registered_metric, True) + + # Convert json string to dict for the comparision + formatted_json_data = json.loads(formatted_data) + + # Check two dicts are equal or not + self.assertEqual(validate_json(formatted_json_data) == validate_json(expected_output), True, + "Check implementation of get_formatted_data") + + def test_validation_get_entity_hierarchy(self): + """ + Test case to check the validation of _get_entity_hierarchy method for Metric object. + :return: None + """ + + # Creating test Metric + test_metric = Metric( + name="Test_Metric", + unit=None, + interval=10, + aggregation_size=2, + sampling_function=sampling_function + ) + + # Checking whether implementation raising the TypeError for invalid input + with self.assertRaises(TypeError): + get_entity_hierarchy(test_metric) + + def test_implementation_get_entity_hierarchy(self): + """ + Test case to check get_entity_entity_hierarchy() for RegisteredEdgeSystem->RegisteredMetric + :return: None + """ + + # Register the edge + registered_entity = self.aws.register(self.edge_system) + + # Creating test Metric + test_metric = Metric( + name="Test_Metric", + unit=None, + interval=10, + aggregation_size=2, + sampling_function=sampling_function + ) + + registered_metric = self.aws.register(test_metric) + + # Creating the parent-child relationship + self.aws.create_relationship(registered_entity, registered_metric) + + # Getting the parent child relationship array from registered metric + entity_hierarchy = get_entity_hierarchy(registered_metric) + + self.assertSequenceEqual([registered_entity.ref_entity.name, registered_metric.ref_entity.name], + entity_hierarchy, "Check the implementation of _get_entity_hierarchy for " + "RegisteredEdgeSystem->RegisteredMetric") + + def test_implementation_get_entity_hierarchy_device_metric(self): + """ + Test case to check get_entity_entity_hierarchy() for RegisteredEdgeSystem->RegisteredDevice->RegisteredMetric + :return: None + """ + + # Register the edge + registered_entity = self.aws.register(self.edge_system) + + # Creating Simulated Device + test_sensor = SimulatedDevice("TestSensor") + + # Registering Device and creating Parent-Child relationship + reg_test_sensor = self.aws.register(test_sensor) + + self.aws.create_relationship(registered_entity, reg_test_sensor) + + # Creating test Metric + test_metric = Metric( + name="Test_Metric", + unit=ureg.degC, + interval=10, + aggregation_size=2, + sampling_function=sampling_function + ) + + registered_metric = self.aws.register(test_metric) + + # Creating the parent-child relationship + self.aws.create_relationship(reg_test_sensor, registered_metric) + + # Getting the parent child relationship array from registered metric + entity_hierarchy = get_entity_hierarchy(registered_metric) + + self.assertSequenceEqual([registered_entity.ref_entity.name, reg_test_sensor.ref_entity.name, + registered_metric.ref_entity.name], entity_hierarchy, + "Check the implementation of _get_entity_hierarchy for " + "RegisteredEdgeSystem->RegisteredDevice->RegisteredMetric") + + +if __name__ == '__main__': + unittest.main(verbosity=1)