Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion doipclient/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -601,9 +601,28 @@ def request_activation(
),
)
self.send_doip_message(message, disable_retry=disable_retry)
timeout = A_PROCESSING_TIME
start_time = time.time()
while True:
result = self.read_doip()
ellapsed_time = time.time() - start_time
if ellapsed_time > timeout:
raise TimeoutError("Timed out waiting for routing activation response")
result = self.read_doip(timeout=(timeout - ellapsed_time))
if isinstance(result, RoutingActivationResponse):
if result.client_logical_address != self._client_logical_address:
logger.warning(
"Routing Activation Response with invalid client logical address, multiple clients detected. Expected: 0x{:04X}, Got: 0x{:04X}. Ignoring".format(
self._client_logical_address, result.client_logical_address
)
)
continue
if result.logical_address != self._ecu_logical_address:
logger.warning(
"Routing Activation Response with invalid ECU logical address, multiple ECUs detected. Expected: 0x{:04X}, Got: 0x{:04X}. Ignoring".format(
self._ecu_logical_address, result.logical_address
)
)
continue
return result
if result:
logger.warning(
Expand Down
62 changes: 57 additions & 5 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,35 @@
successful_activation_response = bytearray(
[
int(x, 16)
for x in "02 fd 00 06 00 00 00 09 0e 00 00 37 10 00 00 00 00".split(" ")
for x in "02 fd 00 06 00 00 00 09 0e 00 00 01 10 00 00 00 00".split(" ")
]
)
unsuccessful_activation_response = bytearray(
[
int(x, 16)
for x in "02 fd 00 06 00 00 00 09 0e 00 00 37 00 00 00 00 00".split(" ")
for x in "02 fd 00 06 00 00 00 09 0e 00 00 01 00 00 00 00 00".split(" ")
]
)
successful_activation_response_with_vm = bytearray(
[
int(x, 16)
for x in "02 fd 00 06 00 00 00 0d 0e 00 00 37 10 00 00 00 00 04 03 02 01".split(
for x in "02 fd 00 06 00 00 00 0d 0e 00 00 01 10 00 00 00 00 04 03 02 01".split(
" "
)
]
)
invalid_client_logical_address_activation_response = bytearray(
[
int(x, 16)
for x in "02 fd 00 06 00 00 00 09 0e 01 00 01 10 00 00 00 00".split(" ")
]
)
invalid_ecu_logical_address_activation_response = bytearray(
[
int(x, 16)
for x in "02 fd 00 06 00 00 00 09 0e 00 00 02 10 00 00 00 00".split(" ")
]
)
nack_response = bytearray([int(x, 16) for x in "02 fd 00 00 00 00 00 01 04".split(" ")])
alive_check_request = bytearray(
[int(x, 16) for x in "02 fd 00 07 00 00 00 00".split(" ")]
Expand Down Expand Up @@ -416,7 +428,7 @@ def test_send_good_activation_request(mock_socket):
assert mock_socket._bound_port == None
assert mock_socket.tx_queue[-1] == activation_request
assert result.client_logical_address == 0x0E00
assert result.logical_address == 55
assert result.logical_address == 1
assert result.response_code == 16
assert result.vm_specific is None

Expand All @@ -427,11 +439,51 @@ def test_send_good_activation_request_with_vm(mock_socket):
result = sut.request_activation(0, 0x01020304)
assert mock_socket.tx_queue[-1] == activation_request_with_vm
assert result.client_logical_address == 0x0E00
assert result.logical_address == 55
assert result.logical_address == 1
assert result.response_code == 16
assert result.vm_specific == 0x04030201


def test_activation_ignores_mismatched_client_logical_address(mock_socket):
sut = DoIPClient(test_ip, test_logical_address, activation_type=None)
mock_socket.rx_queue = [
invalid_client_logical_address_activation_response,
successful_activation_response,
]

result = sut.request_activation(0)

assert result.client_logical_address == 0x0E00
assert result.logical_address == 1
assert result.response_code == 16


def test_activation_ignores_mismatched_ecu_logical_address(mock_socket):
sut = DoIPClient(test_ip, test_logical_address, activation_type=None)
mock_socket.rx_queue = [
invalid_ecu_logical_address_activation_response,
successful_activation_response,
]

result = sut.request_activation(0)

assert result.client_logical_address == 0x0E00
assert result.logical_address == 1
assert result.response_code == 16


def test_activation_times_out_with_only_mismatched_addresses(mock_socket, mocker):
sut = DoIPClient(test_ip, test_logical_address, activation_type=None)
mocker.patch("doipclient.client.A_PROCESSING_TIME", 0.01)
mock_socket.rx_queue = [
invalid_client_logical_address_activation_response,
invalid_ecu_logical_address_activation_response,
]

with pytest.raises(TimeoutError, match=r"ECU failed to respond in time"):
sut.request_activation(0)


def test_activation_with_nack(mock_socket):
sut = DoIPClient(test_ip, test_logical_address)
mock_socket.rx_queue.append(nack_response)
Expand Down
Loading