diff --git a/atest/Protocols.robot b/atest/Protocols.robot index 3875c6e..874e41f 100644 --- a/atest/Protocols.robot +++ b/atest/Protocols.robot @@ -171,4 +171,4 @@ Close client '${client_original}' and switch to client '${alternate_client}' Close server '${original_server}' and switch to server '${alternate_server}' Close Server name=${original_server} - Switch Server name=${alternate_server} \ No newline at end of file + Switch Server name=${alternate_server} diff --git a/atest/asynchronous_messages/asynchronous_msg.robot b/atest/asynchronous_messages/asynchronous_msg.robot index f77c2d6..f3aafb2 100644 --- a/atest/asynchronous_messages/asynchronous_msg.robot +++ b/atest/asynchronous_messages/asynchronous_msg.robot @@ -1,5 +1,6 @@ *** Settings *** Library Process +Library Collections Resource async_resources.robot Test Setup Setup protocol, nodes, and define templates Test teardown Teardown rammbock and increment port numbers @@ -65,20 +66,60 @@ Register an auto reply to work on background Wait until keyword succeeds 2s 0.1s Handler should have been called with '1' sample messages Message cache should be empty Timeout at background - [timeout] 3s + [timeout] 4s [Setup] Setup protocol, one client, background server, and define templates Send 10 messages every 0.5 seconds Load Template sample Reset received messages Set client handler my_handler.respond_to_sample header_filter=messageType Load Template another - Run keyword and expect error Timeout 0.6* Client receives message header_filter=messageType timeout=0.6 + Run keyword and expect error Timeout* Client receives message header_filter=messageType timeout=0.6 [Teardown] Get background results and reset +Two clients handling same message asynchronously without any effect from main message + [Documentation] This test is rather slow will handle multiple messages related to two clients at a time, so run with --exclude slow to skip this. + [Setup] Setup protocol, two clients, background server, and define templates Send 10 messages every 0.5 seconds using given connection + Load Template sample + Reset received messages + Set client handler my_handler.respond_to_sample name=client1 header_filter=messageType + Set client handler my_handler.respond_to_sample name=client2 header_filter=messageType + Load Template sample response + Client receives message name=client2 header_filter=messageType timeout=10 + Client receives message name=client1 header_filter=messageType timeout=10 + Client receives message name=client2 header_filter=messageType timeout=10 + Client receives message name=client1 header_filter=messageType timeout=10 + Client receives message name=client2 header_filter=messageType timeout=10 + sleep 2 + [Teardown] Get background results and reset +Get message using get message template keyword and perform send and receive operations with validation + [Setup] Setup protocol, server, two clients, and define templates + ${message_and_fields}= Get Message Template sample + client sends given message ${message_and_fields} name=ExampleClient1 + client sends given message ${message_and_fields} name=ExampleClient1 + ${message_and_fields}= Get Message Template sample response + Run Keyword And Expect Error timeout: timed out server receives given message ${message_and_fields} alias=Connection1 header_filter=messageType timeout=0.2 + ${message_and_fields}= Get Message Template sample + Set To Dictionary ${message_and_fields[1]} foo 12 + Run Keyword And Expect Error Value of field foo does not match 0x0001!=12 server receives given message ${message_and_fields} alias=Connection1 header_filter=messageType + ${message_and_fields}= Get Message Template sample + server receives given message ${message_and_fields} alias=Connection1 header_filter=messageType + ${message_and_fields}= Get Message Template sample response + server sends given message ${message_and_fields} connection=Connection2 + server sends given message ${message_and_fields} connection=Connection2 + ${message_and_fields}= Get Message Template sample + Run Keyword And Expect Error timeout: timed out client receives given message ${message_and_fields} name=ExampleClient2 header_filter=messageType timeout=0.2 + ${message_and_fields}= Get Message Template sample response + Set To Dictionary ${message_and_fields[1]} bar 12 + Run Keyword And Expect Error Value of field bar does not match 0x0064!=12 client receives given message ${message_and_fields} name=ExampleClient2 header_filter=messageType + ${message_and_fields}= Get Message Template sample response + client receives given message ${message_and_fields} name=ExampleClient2 header_filter=messageType + *** Variables *** ${SOURCEDIR}= ${CURDIR}${/}..${/}..${/}src ${BACKGROUND FILE}= ${CURDIR}${/}background_server.robot - +${PORT2}= 44488 *** Keywords *** + + Send receive another Load template another client sends message @@ -96,8 +137,22 @@ Setup protocol, one client, background server, and define templates Start background process ${background operation} Start TCP client 127.0.0.1 45555 name=client protocol=Example Wait Until Created ${SIGNAL FILE} timeout=10 seconds - sleep 0.1s # Just to make sure we dont get inbetween keywordcalls + sleep 0.2s # Just to make sure we dont get inbetween keywordcalls Connect 127.0.0.1 ${SERVER PORT} + +Setup protocol, two clients, background server, and define templates + [Arguments] ${background operation} + Define Example protocol + Define templates + Remove File ${SIGNAL FILE} + Start background process ${background operation} + sleep 0.1 + Start TCP client 127.0.0.1 ${CLIENT 1 PORT} name=client1 protocol=Example + Wait Until Created ${SIGNAL FILE} timeout=10 seconds + Connect 127.0.0.1 ${PORT2} + Start TCP client 127.0.0.1 ${CLIENT 2 PORT} name=client2 protocol=Example + Connect 127.0.0.1 ${PORT2} + Setup protocol, server, two clients, and define templates Define protocol, start tcp server and two clients protocol=Example Define templates @@ -129,4 +184,4 @@ Start background process ${process}= Start process python -m robot.run --test ${name} --loglevel DEBUG ... --variable BACKGROUND:True --variable PORT:${SERVER PORT} --pythonpath ${SOURCEDIR} ... --outputdir ${TEMPDIR} ${BACKGROUND FILE} - [Return] ${process} + [Return] ${process} \ No newline at end of file diff --git a/atest/asynchronous_messages/background_server.robot b/atest/asynchronous_messages/background_server.robot index eb8d4d2..0407257 100644 --- a/atest/asynchronous_messages/background_server.robot +++ b/atest/asynchronous_messages/background_server.robot @@ -8,6 +8,7 @@ Force tags background *** Variables *** ${BACKGROUND}= ${False} ${PORT}= 44455 +${PORT2}= 44488 *** Test Cases *** Serve on background @@ -19,7 +20,9 @@ Loop on background Send 10 messages every 0.5 seconds Run keyword if ${BACKGROUND} Send 10 messages every 0.5 seconds ... ELSE Set test documentation Skipped because not run on background. - +Send 10 messages every 0.5 seconds using given connection + Run keyword if ${BACKGROUND} Send 10 messages every 0.5 seconds using given connection + ... ELSE Set test documentation Skipped because not run on background. *** Keywords *** Serve on loop Setup connection @@ -35,7 +38,7 @@ Serve Receive another Send sample receive sample Send another - Sleep 5 + Sleep 10 Send 10 messages every 0.5 seconds Setup connection @@ -43,10 +46,6 @@ Send 10 messages every 0.5 seconds \ Send sample receive sample \ Sleep 0.5 -Send sample receive sample - Send sample - Receive sample response - Setup connection Define example protocol Define Templates @@ -54,6 +53,10 @@ Setup connection Touch ${SIGNAL FILE} Accept connection +Send sample receive sample + Send sample + Receive sample response + Send [arguments] ${message} Load template ${message} Server sends message @@ -62,4 +65,51 @@ Receive [arguments] ${message} Load template ${message} Server receives message header_filter=messageType +Setup connection with two clients + Define example protocol + Define Templates + Start TCP server 127.0.0.1 ${PORT2} name=ExampleServer protocol=Example + Touch ${SIGNAL FILE} + Accept connection alias=Connection1 + Accept connection alias=Connection2 + +Send 10 messages every 0.5 seconds using given connection + Setup connection with two clients + load template sample response + server sends message connection=Connection2 + :FOR ${i} IN RANGE 10 + \ Send sample and receive sample using connection1 + \ Sleep 0.01 + load template sample response + server sends message connection=Connection1 + :FOR ${i} IN RANGE 10 + \ Send sample and receive sample using connection2 + \ Sleep 0.01 + load template sample response + server sends message connection=Connection2 + :FOR ${i} IN RANGE 10 + \ Send sample and receive sample using connection2 + \ Sleep 0.01 + load template sample response + server sends message connection=Connection1 + :FOR ${i} IN RANGE 10 + \ Send sample and receive sample using connection1 + \ Sleep 0.01 + load template sample response + server sends message connection=Connection2 + +Send sample and receive sample using connection1 + Send message using given connection sample Connection1 + Receive message using given connection sample response Connection1 + +Send sample and receive sample using connection2 + Send message using given connection sample Connection2 + Receive message using given connection sample response Connection2 + +Send message using given connection [arguments] ${message} ${connection} + ${data}= get message template ${message} + Server sends given message ${data} connection=${connection} +Receive message using given connection [arguments] ${message} ${connection} + ${data}= get message template ${message} + Server receives given message ${data} alias=${connection} header_filter=messageType diff --git a/atest/asynchronous_messages/my_handler.py b/atest/asynchronous_messages/my_handler.py index 115434d..b2d36f1 100644 --- a/atest/asynchronous_messages/my_handler.py +++ b/atest/asynchronous_messages/my_handler.py @@ -1,9 +1,9 @@ from Rammbock import logger +import sys RECEIVED_MESSAGES = [] -SERVER_SENT = {'sample': 0, - 'another': 0} +SERVER_SENT = {'sample': 0, 'another': 0} def handle_sample(rammbock, msg): @@ -16,27 +16,19 @@ def reset_received_messages(): def respond_to_sample(rammbock, msg, client): + RECEIVED_MESSAGES.append(msg) foo = "adding Extra Variable to replicate ArgCount bug" bar = "adding Extra Variable to replicate ArgCount bug" - RECEIVED_MESSAGES.append(msg) - rammbock.save_template("__backup_template") - try: - rammbock.load_template("sample response") - rammbock.client_sends_message('name=%s' % client.name) - finally: - rammbock.load_template("__backup_template") + message_template = rammbock.get_message_template('sample response') + rammbock.client_sends_given_message(message_template, client.name) def server_respond_to_another_max_100(rammbock, msg, server, connection): RECEIVED_MESSAGES.append(msg) if SERVER_SENT['another'] < 100: SERVER_SENT['another'] = SERVER_SENT['another'] + 1 - rammbock.save_template("__backup_template") - try: - rammbock.load_template("another") - rammbock.server_sends_message('name=%s' % server.name, 'connection=%s' % connection.name) - finally: - rammbock.load_template("__backup_template") + message_template = rammbock.get_message_template('another') + rammbock.server_sends_given_message(message_template, server.name, connection.name) else: logger.warn("Reached 100 in another") @@ -45,12 +37,8 @@ def server_respond_to_sample_response_max_100(rammbock, msg): RECEIVED_MESSAGES.append(msg) if SERVER_SENT['sample'] < 100: SERVER_SENT['sample'] = SERVER_SENT['sample'] + 1 - rammbock.save_template("__backup_template") - try: - rammbock.load_template("sample") - rammbock.server_sends_message() - finally: - rammbock.load_template("__backup_template") + message_template = rammbock.get_message_template('sample') + rammbock.server_sends_given_message(message_template) else: logger.warn("Reached 100 in sample") diff --git a/src/Rammbock/core.py b/src/Rammbock/core.py index 95317af..14dd23f 100644 --- a/src/Rammbock/core.py +++ b/src/Rammbock/core.py @@ -16,7 +16,6 @@ import copy from contextlib import contextmanager from .logger import logger -from .synchronization import SynchronizedType from .templates.containers import BagTemplate, CaseTemplate from .message import _StructuredElement from .networking import (TCPServer, TCPClient, UDPServer, UDPClient, SCTPServer, @@ -33,8 +32,6 @@ class RammbockCore(object): ROBOT_LIBRARY_SCOPE = 'GLOBAL' - __metaclass__ = SynchronizedType - def __init__(self): self._init_caches() @@ -75,21 +72,17 @@ def set_client_handler(self, handler_func, name=None, header_filter=None, interv be called on background. By default the incoming messages are checked every 0.5 seconds. - The handler function will be called with two arguments: the rammbock library - instance and the received message. + The handler function will be called with three arguments: the rammbock library + instance, received message and the client instance. Example: | Load template | SomeMessage | | Set client handler | my_module.respond_to_sample | my_module.py: - | def respond_to_sample(rammbock, msg): - | rammbock.save_template("__backup_template", unlocked=True) - | try: - | rammbock.load_template("sample response") - | rammbock.client_sends_message() - | finally: - | rammbock.load_template("__backup_template") + | def respond_to_sample(rammbock, msg, client): + | message_template = rammbock.get_message_template('sample response') + | rammbock.client_sends_given_message(message_template, client.name) """ msg_template = self._get_message_template() client, client_name = self._clients.get_with_name(name) @@ -113,21 +106,17 @@ def set_server_handler(self, handler_func, name=None, header_filter=None, alias= The alias is the alias for the connection. By default the current active connection will be used. - The handler function will be called with two arguments: the rammbock library - instance and the received message. + The handler function will be called with four arguments: the rammbock library + instance, received message, server instance and connection instance. Example: | Load template | SomeMessage | | Set server handler | my_module.respond_to_sample | messageType | my_module.py: - | def respond_to_sample(rammbock, msg): - | rammbock.save_template("__backup_template", unlocked=True) - | try: - | rammbock.load_template("sample response") - | rammbock.server_sends_message() - | finally: - | rammbock.load_template("__backup_template") + | def respond_to_sample(rammbock, msg, server, connection): + | message_template = rammbock.get_message_template('sample response') + | rammbock.server_sends_given_message(message_template, server.name, connection.name) """ msg_template = self._get_message_template() server, server_name = self._servers.get_with_name(name) @@ -341,7 +330,7 @@ def _register_receive(self, receiver, label, name, error='', connection=None): self._message_sequence.receive(name, receiver.get_own_address(), receiver.get_peer_address(alias=connection), receiver.protocol_name, label, error) - def client_sends_binary(self, message, name=None, label=None): + def client_sends_binary(self, message, name=None, label=None, connection=None): """Send raw binary `message`. If client `name` is not given, uses the latest client. Optional message @@ -469,6 +458,18 @@ def load_template(self, name, *parameters): template, fields, header_fields = self._set_templates_fields_and_header_fields(name, parameters) self._init_new_message_stack(template, fields, header_fields) + def get_message_template(self, name, *parameters): + """Returns the template and its respective fileds which can be used to set the + We can use the this data to send the message without using the load template. + + Useful to handle messages related to multiple clients or servers when any client + or server is waiting to receive a message. + Examples: + |${message_template} = | get Template | MyMessage | + """ + template, fields, header_fields = self._set_templates_fields_and_header_fields(name, parameters) + return template, fields, header_fields + def load_copy_of_template(self, name, *parameters): """Load a copy of message template saved with `Save template` when originally saved values need to be preserved from test to test. @@ -504,7 +505,11 @@ def get_message(self, *parameters): return self._encode_message(message_fields, header_fields) def _encode_message(self, message_fields, header_fields): - msg = self._get_message_template().encode(message_fields, header_fields) + msg = self._encode_given_message(self._get_message_template(), message_fields, header_fields) + return msg + + def _encode_given_message(self, template, message_fields, header_fields): + msg = template.encode(message_fields, header_fields) logger.debug('%s' % repr(msg)) return msg @@ -528,6 +533,17 @@ def client_sends_message(self, *parameters): """ self._send_message(self.client_sends_binary, parameters) + def client_sends_given_message(self, message_template, name=None, *parameters): + """Send a message which is retrieved using `get message template` keyword + Parameter `name` separated with equals and message is mandatory. + + Examples: + |${retrieved_message_template} = | Get Message Template | sample | + | Client sends given message | ${retrieved_message_template} | name=Client1 | + """ + template, fields, header_fields = message_template + self._send_given_message(self.client_sends_binary, template, fields, header_fields, name, parameters) + # FIXME: support "send to" somehow. A new keyword? def server_sends_message(self, *parameters): """Send a message defined with `New Message`. @@ -543,11 +559,28 @@ def server_sends_message(self, *parameters): """ self._send_message(self.server_sends_binary, parameters) + def server_sends_given_message(self, message_template, fields={}, header_fields={}, name=None, connection=None, *parameters): + """Send a message which is retrieved using `get message template` keyword + Parameter `name` separated with equals and message is mandatory. + + Examples: + | ${retrieved_message_template} = | Get message template | sample | + | Server sends given message | ${retrieved_message_template} | name=server1 | connection=Connection1 | + """ + template, fields, header_fields = message_template + self._send_given_message(self.server_sends_binary, template, fields, header_fields, node_name=name, connection_name=connection) + def _send_message(self, callback, parameters): configs, message_fields, header_fields = self._get_parameters_with_defaults(parameters) msg = self._encode_message(message_fields, header_fields) + logger.debug("sending message %s" % msg) callback(msg._raw, label=self._current_container.name, **configs) + def _send_given_message(self, callback, template, message_fields, header_fields, node_name=None, connection_name=None): + configs, message_fields, header_fields = self._get_parameters_with_given_data(message_fields, header_fields, []) + msg = self._encode_given_message(template, message_fields, header_fields) + callback(msg._raw, label=template.name, name=node_name, connection=connection_name) + def client_receives_message(self, *parameters): """Receive a message with template defined using `New Message` and validate field values. @@ -570,6 +603,29 @@ def client_receives_message(self, *parameters): self._validate_message(msg, message_fields, header_fields) return msg + def client_receives_given_message(self, message_template, *parameters): + """Receive a message with template retrieved using `Get Message Template` and + validate field values. + Message template has to be retrieved with `Get Message Template` before calling + this. + + Mandatory parameters: + - `name` example: `name=client1` + + Optional parameters: + - `timeout` for receiving message. example: `timeout=0.1` + - `latest` if set to True, get latest message from buffer instead first. Default is False. Example: `latest=True` + + Examples: + | ${retrieved_message_template} = | Get message template | sample | + | ${msg} = | Client receives given message | ${retrieved_message_template} | name=Client1 | + | ${msg} = | Client receives given message | ${retrieved_message_template} | name=Client1 | timeout=5 | + """ + template, message_fields, header_fields = message_template + with self._receive_message_using_given_template(self._clients, template, message_fields, header_fields, *parameters) as (msg, message_fields, header_fields): + self._validate_message_with_given_template(template, msg, message_fields, header_fields) + return msg + def client_receives_without_validation(self, *parameters): """Receive a message with template defined using `New Message`. @@ -615,6 +671,31 @@ def server_receives_message(self, *parameters): self._validate_message(msg, message_fields, header_fields) return msg + def server_receives_given_message(self, message_template, *parameters): + """Receive a message with template retrieved using `Get Message Template` and + validate field values. + + Message template has to be retrieved with `Get Message Template` before calling + this. + + Mandatory parameters: + - `connection` alias. example: `connection=connection 1` + + Optional parameters: + - `name` the server name example: `name=Server1` + - `timeout` for receiving message. example: `timeout=0.1` + - `latest` if set to True, get latest message from buffer instead first. Default is False. Example: `latest=True` + + Examples: + | ${retrieved_message_template} = | Get message template | sample | + | ${msg} = | Server receives given message | ${retrieved_message_template} | name=Server1 | alias=my_connection | + | ${msg} = | Server receives given message | ${retrieved_message_template} | name=Server1 | alias=my_connection | timeout=5 | + """ + template, message_fields, header_fields = message_template + with self._receive_message_using_given_template(self._servers, template, message_fields, header_fields, *parameters) as (msg, message_fields, header_fields): + self._validate_message_with_given_template(template, msg, message_fields, header_fields) + return msg + def server_receives_without_validation(self, *parameters): """Receive a message with template defined using `New Message`. @@ -652,6 +733,13 @@ def _validate_message(self, msg, message_fields, header_fields): logger.info('\n'.join(errors)) raise AssertionError(errors[0]) + def _validate_message_with_given_template(self, template, msg, message_fields, header_fields): + errors = template.validate(msg, message_fields, header_fields) + if errors: + logger.info("Validation failed for %s" % repr(msg)) + logger.info('\n'.join(errors)) + raise AssertionError(errors[0]) + @contextmanager def _receive(self, nodes, *parameters): configs, message_fields, header_fields = self._get_parameters_with_defaults(parameters) @@ -665,6 +753,19 @@ def _receive(self, nodes, *parameters): self._register_receive(node, self._current_container.name, name, error=e.args[0]) raise e + @contextmanager + def _receive_message_using_given_template(self, nodes, template, message_fields, header_fields, *parameters): + configs, message_fields, header_fields = self._get_parameters_with_given_data(message_fields, header_fields, parameters) + node, name = nodes.get_with_name(configs.pop('name', None)) + msg = node.get_message(template, **configs) + try: + yield msg, message_fields, header_fields + self._register_receive(node, self._current_container.name, name) + logger.debug("Received %s" % repr(msg)) + except AssertionError, e: + self._register_receive(node, self._current_container.name, name, error=e.args[0]) + raise e + def uint(self, length, name, value=None, align=None): """Add an unsigned integer to template. @@ -720,7 +821,7 @@ def _add_field(self, field): raise AssertionError('Adding fields to message loaded with Load template is not allowed') self._current_container.add(field) - def new_struct(self, type, name, *parameters): + def new_struct(self, struct_type, name, *parameters): """Defines a new struct to template. You must call `End Struct` to end struct definition. `type` is the name @@ -739,7 +840,7 @@ def new_struct(self, type, name, *parameters): """ configs, parameters, _ = self._get_parameters_with_defaults(parameters) self._add_struct_name_to_params(name, parameters) - self._message_stack.append(StructTemplate(type, name, self._current_container, parameters, length=configs.get('length'), align=configs.get('align'))) + self._message_stack.append(StructTemplate(struct_type, name, self._current_container, parameters, length=configs.get('length'), align=configs.get('align'))) def _add_struct_name_to_params(self, name, parameters): for param_key in parameters.keys(): @@ -778,8 +879,8 @@ def _new_list(self, size, name): def _end_list(self): """End list definition. See `New List`. """ - list = self._message_stack.pop() - self._add_field(list) + list_data = self._message_stack.pop() + self._add_field(list_data) def new_binary_container(self, name): """Defines a new binary container to template. @@ -820,7 +921,7 @@ def bin(self, size, name, value=None): def tbcd(self, size, name, value=None): self._add_field(TBCD(size, name, value)) - def new_union(self, type, name): + def new_union(self, union_type, name): """Defines a new union to template of `type` and `name`. Fields inside the union are alternatives and the length of the union is @@ -832,7 +933,7 @@ def new_union(self, type, name): | u32 | int | | End union | """ - self._message_stack.append(UnionTemplate(type, name, self._current_container)) + self._message_stack.append(UnionTemplate(union_type, name, self._current_container)) def end_union(self): """End union definition. See `New Union`. @@ -927,6 +1028,12 @@ def _get_parameters_with_defaults(self, parameters): headers = self._populate_defaults(headers, self._header_values) return config, fields, headers + def _get_parameters_with_given_data(self, message_fields, header_fields, parameters): + config, fields, headers = self._parse_parameters(parameters or []) + fields = self._populate_defaults(fields, message_fields or {}) + headers = self._populate_defaults(headers, header_fields or {}) + return config, fields, headers + def _populate_defaults(self, fields, default_values): ret_val = default_values ret_val.update(fields) @@ -972,7 +1079,7 @@ def _get_headers(self, fields): return headers, fields def _to_dict(self, *lists): - return (dict(list) for list in lists) + return (dict(data_list) for data_list in lists) def _parse_entry(self, param, configs, fields): colon_index = param.find(':') diff --git a/src/Rammbock/networking.py b/src/Rammbock/networking.py index 5b4950c..710f133 100644 --- a/src/Rammbock/networking.py +++ b/src/Rammbock/networking.py @@ -18,6 +18,7 @@ from .logger import logger from .synchronization import SynchronizedType from .binary_tools import to_hex +import threading try: from sctp import sctpsocket_tcp @@ -59,6 +60,9 @@ class _NetworkNode(_WithTimeouts): parent = None name = '' + def __init__(self): + self._lock = threading.RLock() + def set_handler(self, msg_template, handler_func, header_filter, alias=None, interval=None): if alias: raise AssertionError('Named connections not supported.') @@ -84,7 +88,7 @@ def close(self): def _get_message_stream(self): if not self._protocol: return None - return self._protocol.get_message_stream(BufferedStream(self, self._default_timeout)) + return self._protocol.get_message_stream(BufferedStream(self, self._default_timeout), self._lock) def get_message(self, message_template, timeout=None, header_filter=None, latest=None): if not self._protocol: @@ -293,12 +297,12 @@ def get_peer_address(self, alias=None): class _TCPConnection(_NetworkNode, _TCPNode): def __init__(self, parent, socket, protocol=None): + _NetworkNode.__init__(self) self.parent = parent self._socket = socket self._protocol = protocol self._message_stream = self._get_message_stream() self._is_connected = True - _NetworkNode.__init__(self) class SCTPServer(StreamServer, _SCTPNode): @@ -312,12 +316,12 @@ class TCPServer(StreamServer, _TCPNode): class _Client(_NetworkNode): def __init__(self, timeout=None, protocol=None, family=None): + _NetworkNode.__init__(self) self._is_connected = False self._init_socket(family) self._set_default_timeout(timeout) self._protocol = protocol self._message_stream = None - _NetworkNode.__init__(self) def set_own_ip_and_port(self, ip=None, port=None): if ip and port: diff --git a/src/Rammbock/synchronization.py b/src/Rammbock/synchronization.py index 82bd5b2..94798fb 100644 --- a/src/Rammbock/synchronization.py +++ b/src/Rammbock/synchronization.py @@ -1,16 +1,11 @@ -import threading - from .decorator import decorator -LOCK = threading.RLock() - - @decorator -def synchronized(f, *args, **kw): +def synchronized(f, node, *args, **kw): """ Synchronization decorator """ - with LOCK: - return f(*args, **kw) + with node._lock: + return f(node, *args, **kw) class SynchronizedType(type): diff --git a/src/Rammbock/templates/containers.py b/src/Rammbock/templates/containers.py index c08c66d..e6b49e4 100644 --- a/src/Rammbock/templates/containers.py +++ b/src/Rammbock/templates/containers.py @@ -192,8 +192,8 @@ def read(self, stream, timeout=None): pdu_bytes = stream.read(length) return header, pdu_bytes - def get_message_stream(self, buffered_stream): - return MessageStream(buffered_stream, self) + def get_message_stream(self, buffered_stream, lock): + return MessageStream(buffered_stream, self, lock) class MessageTemplate(_Template): diff --git a/src/Rammbock/templates/message_stream.py b/src/Rammbock/templates/message_stream.py index 2a40d29..38f1762 100644 --- a/src/Rammbock/templates/message_stream.py +++ b/src/Rammbock/templates/message_stream.py @@ -18,12 +18,11 @@ from Rammbock.logger import logger from Rammbock.binary_tools import to_bin, to_int -from Rammbock.synchronization import LOCK class MessageStream(object): - def __init__(self, stream, protocol): + def __init__(self, stream, protocol, lock): self._cache = [] self._stream = stream self._protocol = protocol @@ -31,6 +30,7 @@ def __init__(self, stream, protocol): self._handler_thread = None self._running = True self._interval = 0.5 + self._lock = lock def close(self): self._running = False @@ -56,7 +56,7 @@ def get(self, message_template, timeout=None, header_filter=None, latest=None): return msg cutoff = time.time() + float(timeout if timeout else 0) while not timeout or time.time() < cutoff: - with LOCK: + with self._lock: header, pdu_bytes = self._protocol.read(self._stream, timeout=timeout) if self._matches(header, header_fields, header_filter): return self._to_msg(message_template, header, pdu_bytes) @@ -140,7 +140,7 @@ def match_handlers_periodically(self): def match_handlers(self): try: while True: - with LOCK: + with self._lock: self._try_matching_cached_to_templates() header, pdu_bytes = self._protocol.read(self._stream, timeout=0.01) self._match_or_cache(header, pdu_bytes) diff --git a/utest/test_networking.py b/utest/test_networking.py index 61fc773..965da05 100644 --- a/utest/test_networking.py +++ b/utest/test_networking.py @@ -149,15 +149,6 @@ def test_setting_server_default_timeout(self): server, _ = self._udp_server_and_client(ports['SERVER_PORT'], ports['CLIENT_PORT'], timeout=0.1) self._assert_timeout(server) - @contextmanager - def _without_sync(self): - original_LOCK = synchronization.LOCK - try: - synchronization.LOCK = Semaphore(100) - yield - finally: - synchronization.LOCK = original_LOCK - @contextmanager def _client_and_server(self, port): server = TCPServer(LOCAL_IP, port) @@ -169,19 +160,17 @@ def _client_and_server(self, port): client.close() def test_connection_timeout(self): - with self._without_sync(): - with self._client_and_server(ports['SERVER_PORT']) as (client, server): - timer_obj = Timer(0.1, client.connect_to, [LOCAL_IP, ports['SERVER_PORT']]) - timer_obj.start() - server.accept_connection(timeout="0.5") + with self._client_and_server(ports['SERVER_PORT']) as (client, server): + timer_obj = Timer(0.1, client.connect_to, [LOCAL_IP, ports['SERVER_PORT']]) + timer_obj.start() + server.accept_connection(timeout="0.5") def test_connection_timeout_failure(self): - with self._without_sync(): - with self._client_and_server(ports['SERVER_PORT']) as (client, server): - timer_obj = Timer(0.2, client.connect_to, [LOCAL_IP, ports['SERVER_PORT']]) - timer_obj.start() - self.assertRaises(socket.timeout, server.accept_connection, timeout=0.1) - timer_obj.cancel() + with self._client_and_server(ports['SERVER_PORT']) as (client, server): + timer_obj = Timer(0.2, client.connect_to, [LOCAL_IP, ports['SERVER_PORT']]) + timer_obj.start() + self.assertRaises(socket.timeout, server.accept_connection, timeout=0.1) + timer_obj.cancel() # FIXME: this deadlocks def xtest_blocking_timeout(self): diff --git a/utest/test_templates/test_endianness.py b/utest/test_templates/test_endianness.py index fe40e2b..7f78227 100644 --- a/utest/test_templates/test_endianness.py +++ b/utest/test_templates/test_endianness.py @@ -4,6 +4,7 @@ from Rammbock.binary_tools import to_bin from .tools import * from Rammbock.templates.message_stream import MessageStream +import threading class TestLittleEndian(TestCase): @@ -51,7 +52,7 @@ def test_encode_little_endian_header(self): def test_decode_little_endian_header(self): byte_stream = MockStream(to_bin('0x0500 0800 cafe babe')) - self._msg_stream = MessageStream(byte_stream, self._protocol) + self._msg_stream = MessageStream(byte_stream, self._protocol, threading.RLock()) decoded = self._msg_stream.get(self.tmp) self.assertEquals(decoded._header.msgId.hex, '0x0005') self.assertEquals(decoded._header.msgId._raw, to_bin('0x0500')) diff --git a/utest/test_templates/test_message_stream.py b/utest/test_templates/test_message_stream.py index ff64e0e..9686f72 100644 --- a/utest/test_templates/test_message_stream.py +++ b/utest/test_templates/test_message_stream.py @@ -4,6 +4,7 @@ from Rammbock.templates.message_stream import MessageStream from Rammbock.templates import Protocol, MessageTemplate, UInt, PDU from Rammbock.binary_tools import to_bin +import threading class TestProtocolMessageReceiving(TestCase): @@ -32,7 +33,7 @@ def setUp(self): self._msg.add(UInt(1, 'field_1', None)) self._msg.add(UInt(1, 'field_2', None)) byte_stream = MockStream(to_bin('0xff0004cafe aa0004dead dd0004beef')) - self._msg_stream = MessageStream(byte_stream, self._protocol) + self._msg_stream = MessageStream(byte_stream, self._protocol, threading.RLock()) def test_get_message(self): msg = self._msg_stream.get(self._msg, header_filter='id') diff --git a/win_execute_regression_tests.cmd b/win_execute_regression_tests.cmd index a559b3f..5c88880 100644 --- a/win_execute_regression_tests.cmd +++ b/win_execute_regression_tests.cmd @@ -2,5 +2,5 @@ set BASE=%~dp0 set DEFAULT_TARGET= IF [%1]==[] set DEFAULT_TARGET="%BASE%atest" -pybot -c regression -L DEBUG --pythonpath "%BASE%src" %DEFAULT_TARGET% %* +pybot -c regression -L TRACE --pythonpath "%BASE%src" %DEFAULT_TARGET% %* pause