From ad25c0d451775910aa57471663d7e04777375100 Mon Sep 17 00:00:00 2001 From: Yuelin Zhao Date: Thu, 21 May 2026 23:57:17 +0000 Subject: [PATCH] fix(bug-bash): TLS 1.3 follow-ups for 0.31.0b1 Three bug-bash regressions discovered during the 0.31.0b1 preview cycle: #7 az iot dps linked-hub create: reject re-linking the same hub under a different hostname type or authentication method. Previously silently created duplicate iotHubs entries. Adds a single _linked_hub_hostname helper shared with _warn_mixed_endpoint_types (refactor). #8 az iot hub {device,module}-identity connection-string show: reject --hostname-type service up-front. Devices and modules cannot authenticate against the service endpoint. #9 az iot hub generate-sas-token: add --hostname-type and produce audience-correct SAS tokens for TLS 1.3 hubs. - Hub-level scope defaults to 'auto' = service endpoint on GWv2. - Device/module scope defaults to 'auto' = device endpoint on GWv2. - service hostname-type is rejected for device/module scopes. Tests ----- Unit: +23 SAS hostname-type permutation tests, +3 CS service-rejection tests, +9 dup-link guard tests, +7 _linked_hub_hostname helper tests. 332 unit tests pass on the touched files. Int: +1 DPS dup-link test, +1 hub-level SAS hostname-type test, +1 device-scope CS/SAS permutation test, +1 module-scope test. flake8 clean across the whole package. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- HISTORY.rst | 12 + azext_iot/_help.py | 6 + azext_iot/_params.py | 15 + azext_iot/core/custom.py | 40 ++- azext_iot/operations/hub.py | 39 ++- .../tests/dps/core/test_dps_linked_hub_int.py | 46 +++ .../dps/core/test_dps_linked_hub_unit.py | 178 +++++++++- .../iothub/core/test_iothub_utilities_int.py | 64 ++++ .../iothub/core/test_iothub_utilities_unit.py | 312 ++++++++++++++++++ .../iothub/devices/test_iothub_devices_int.py | 94 ++++++ .../iothub/modules/test_iothub_modules_int.py | 97 ++++++ 11 files changed, 890 insertions(+), 13 deletions(-) diff --git a/HISTORY.rst b/HISTORY.rst index 3debca45e..859969631 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -33,6 +33,18 @@ Release History * For managed-identity links, the linked hub's hostname is auto-resolved from the IoT Hub's ``deviceHostName`` (with classic fallback for V1 hubs). CLI validates that the appropriate identity type is enabled on the DPS resource before creating a managed-identity link. +**Bug fixes (0.31.0b1 bug bash follow-ups)** + +* ``az iot dps linked-hub create`` now rejects linking the same IoT Hub more than once to the same DPS, regardless of hostname type or authentication method. Previously the second link silently created a duplicate ``iotHubs`` entry under a different hostname (e.g. classic vs ``.device.``), leaving the DPS in an ambiguous state. Use ``az iot dps linked-hub delete`` to remove the existing link before re-linking. + +* ``az iot hub device-identity connection-string show`` and ``az iot hub module-identity connection-string show`` now reject ``--hostname-type service``. Devices and modules cannot authenticate against the service endpoint; use ``auto``, ``device`` or ``classic`` instead. + +* ``az iot hub generate-sas-token`` gains a ``--hostname-type`` parameter and now produces audience-correct SAS tokens for TLS 1.3 hubs: + + - Hub-level SAS (no ``-d``) defaults to ``--hostname-type auto`` which resolves to the service hostname on GWv2 hubs and the classic hostname on V1 hubs. ``service``, ``device`` and ``classic`` are explicitly selectable. + - Device-level SAS (``-d``) and module-level SAS (``-d -m``) default to ``auto`` which resolves to the device hostname on GWv2 hubs and the classic hostname on V1 hubs. ``--hostname-type service`` is rejected. + - **Behavior change:** on GWv2 hubs the ``sr=`` audience for device/module SAS tokens now points at the device endpoint (``.device.azure-devices.net/devices/...``). Hub-side enforcement of audience-to-endpoint match is rolling out, so consumers should treat this as the correct default. + 0.29.0 +++++++++++++++ diff --git a/azext_iot/_help.py b/azext_iot/_help.py index d730fb43e..2b9dbe1ef 100644 --- a/azext_iot/_help.py +++ b/azext_iot/_help.py @@ -689,6 +689,12 @@ text: > az iot hub generate-sas-token --connection-string 'HostName=myhub.azure-devices.net;DeviceId=mydevice;ModuleId=mymodule;SharedAccessKeyName=iothubowner;SharedAccessKey=12345' + - name: Generate a device SAS token whose audience targets the TLS 1.3 device endpoint of a GWv2 hub. + text: > + az iot hub generate-sas-token -d {device_id} -n {iothub_name} --hostname-type device + - name: Generate an IoT Hub SAS token whose audience explicitly targets the TLS 1.3 service endpoint. + text: > + az iot hub generate-sas-token -n {iothub_name} --hostname-type service """ helps[ diff --git a/azext_iot/_params.py b/azext_iot/_params.py index 4374780b9..2877c367f 100644 --- a/azext_iot/_params.py +++ b/azext_iot/_params.py @@ -380,6 +380,21 @@ def load_arguments(self, _): "'service' uses the TLS 1.3 service hostname (errors if not GWv2).", ) + with self.argument_context("iot hub generate-sas-token") as context: + context.argument( + "hostname_type", + options_list=["--hostname-type", "--ht"], + arg_type=get_enum_type(HostnameType), + default=HostnameType.AUTO.value, + help="Type of hostname to use as the SAS token audience (sr=). " + "'auto' uses the TLS 1.3 device hostname for device/module-scoped tokens " + "and the service hostname for hub-scoped tokens (on GWv2 hubs). " + "'classic' always uses the default hostname. " + "'device' uses the TLS 1.3 device hostname (errors if not GWv2). " + "'service' uses the TLS 1.3 service hostname (errors if not GWv2 or if " + "a device/module is specified).", + ) + with self.argument_context("iot hub job") as context: context.argument("job_id", options_list=["--job-id"], help="IoT Hub job Id.") context.argument( diff --git a/azext_iot/core/custom.py b/azext_iot/core/custom.py index 7cfe0a57f..b449c35a7 100644 --- a/azext_iot/core/custom.py +++ b/azext_iot/core/custom.py @@ -98,6 +98,22 @@ def _resolve_linked_hub_hostname(hub, hostname_type="auto"): return device_hostname or hub["properties"]["hostName"] +def _linked_hub_hostname(hub): + """Return the raw hostname for a linked-hub entry, checking ``hostName``, + ``connectionString`` and ``name`` in turn. Returns "" if none is available. + """ + hostname = hub.get("hostName", "") or "" + if not hostname: + cs = hub.get("connectionString", "") or "" + for part in cs.split(";"): + if part.lower().startswith("hostname="): + hostname = part.split("=", 1)[1] + break + if not hostname: + hostname = hub.get("name", "") or "" + return hostname + + def _warn_mixed_endpoint_types(linked_hubs): """Warn if DPS dynamic allocation references hubs with mixed hostname types.""" types = set() @@ -105,15 +121,7 @@ def _warn_mixed_endpoint_types(linked_hubs): # Only check hubs participating in allocation if hub.get("applyAllocationPolicy") is False: continue - hostname = hub.get("hostName", "") - if not hostname: - cs = hub.get("connectionString", "") - for part in cs.split(";"): - if part.lower().startswith("hostname="): - hostname = part.split("=", 1)[1] - break - if not hostname: - hostname = hub.get("name", "") + hostname = _linked_hub_hostname(hub) parts = hostname.split(".") if len(parts) > 1 and parts[1] == "device": types.add("device") @@ -478,6 +486,20 @@ def iot_dps_linked_hub_create( if allocation_weight is not None: linked_hub_entry["allocationWeight"] = allocation_weight + # Reject duplicate hub linking (same hub under any hostname type) — bug bash 0.31.0b1 + new_short_name = _linked_hub_hostname(linked_hub_entry).split(".")[0].lower() + existing_short_names = { + _linked_hub_hostname(h).split(".")[0].lower() + for h in dps["properties"]["iotHubs"] + } + existing_short_names.discard("") + if new_short_name and new_short_name in existing_short_names: + raise InvalidArgumentValueError( + f"IoT Hub '{new_short_name}' is already linked to DPS '{dps_name}'. " + "Remove the existing link with 'az iot dps linked-hub delete' " + "before re-linking under a different hostname type or authentication method." + ) + dps["properties"]["iotHubs"].append(linked_hub_entry) # Warn if linked hubs have mixed hostname types (device + classic) diff --git a/azext_iot/operations/hub.py b/azext_iot/operations/hub.py index 6b2962ee5..c3e804ada 100644 --- a/azext_iot/operations/hub.py +++ b/azext_iot/operations/hub.py @@ -2240,6 +2240,7 @@ def iot_get_sas_token( module_id=None, auth_type_dataplane=None, connection_string=None, + hostname_type=HostnameType.AUTO.value, ): key_type = key_type.lower() policy_name = policy_name.lower() @@ -2256,6 +2257,12 @@ def iot_get_sas_token( raise ArgumentUsageError( "You are unable to get sas token for module without device information." ) + if device_id and hostname_type == HostnameType.SERVICE.value: + raise InvalidArgumentValueError( + "Hostname type 'service' is not supported for device or module SAS tokens. " + "Devices and modules cannot authenticate against the service endpoint. " + "Use 'auto', 'device', or 'classic' instead." + ) if connection_string: return { @@ -2277,6 +2284,7 @@ def iot_get_sas_token( resource_group_name, login, auth_type_dataplane, + hostname_type, ).generate_sas_token() } @@ -2330,6 +2338,7 @@ def _iot_build_sas_token( resource_group_name=None, login=None, auth_type_dataplane=None, + hostname_type=HostnameType.AUTO.value, ): from azext_iot.common._azure import ( parse_iot_device_connection_string, @@ -2352,6 +2361,18 @@ def _iot_build_sas_token( policy = None key = None + # Resolve the hostname used to build the SAS audience (`sr=`). + # Device/module scopes default to the device endpoint on GWv2 hubs; hub-level + # scope defaults to the service endpoint. The `service` hostname type was + # already rejected for device/module scopes in iot_get_sas_token. + auto_tls_key = "deviceHostName" if device_id else "serviceHostName" + if login: + resolved_host = _transform_hostname(target["entity"], hostname_type) + else: + resolved_host = _resolve_hostname_by_type( + target, hostname_type, auto_tls_key=auto_tls_key + ) + if device_id: logger.info( 'Obtaining device "%s" details from registry, using IoT Hub policy "%s"', @@ -2365,7 +2386,7 @@ def _iot_build_sas_token( entity=module, key_type=key_type ) uri = "{}/devices/{}/modules/{}".format( - target["entity"], device_id, module_id + resolved_host, device_id, module_id ) try: parsed_module_cs = parse_iot_device_module_connection_string(module_cs) @@ -2378,7 +2399,7 @@ def _iot_build_sas_token( device_cs = _build_device_or_module_connection_string( entity=device, key_type=key_type ) - uri = "{}/devices/{}".format(target["entity"], device_id) + uri = "{}/devices/{}".format(resolved_host, device_id) try: parsed_device_cs = parse_iot_device_connection_string(device_cs) except ValueError as e: @@ -2387,7 +2408,7 @@ def _iot_build_sas_token( key = parsed_device_cs["SharedAccessKey"] else: - uri = target["entity"] + uri = resolved_host policy = target["policy"] key = target["primarykey"] if key_type == "primary" else target["secondarykey"] @@ -2468,6 +2489,12 @@ def iot_get_device_connection_string( auth_type_dataplane=None, hostname_type=HostnameType.AUTO.value, ): + if hostname_type == HostnameType.SERVICE.value: + raise InvalidArgumentValueError( + "Hostname type 'service' is not supported for device connection strings. " + "Devices cannot authenticate against the service endpoint. " + "Use 'auto', 'device', or 'classic' instead." + ) result = {} discovery = IotHubDiscovery(cmd) target = discovery.get_target( @@ -2498,6 +2525,12 @@ def iot_get_module_connection_string( auth_type_dataplane=None, hostname_type=HostnameType.AUTO.value, ): + if hostname_type == HostnameType.SERVICE.value: + raise InvalidArgumentValueError( + "Hostname type 'service' is not supported for module connection strings. " + "Modules cannot authenticate against the service endpoint. " + "Use 'auto', 'device', or 'classic' instead." + ) result = {} discovery = IotHubDiscovery(cmd) target = discovery.get_target( diff --git a/azext_iot/tests/dps/core/test_dps_linked_hub_int.py b/azext_iot/tests/dps/core/test_dps_linked_hub_int.py index 4daf916ae..a66ff6f76 100644 --- a/azext_iot/tests/dps/core/test_dps_linked_hub_int.py +++ b/azext_iot/tests/dps/core/test_dps_linked_hub_int.py @@ -172,3 +172,49 @@ def test_linked_hub_list_shows_hostname(provisioned_iot_dps_no_hub_module): f"Should find linked hub with name '{device_hostname}'. Found: {[h['name'] for h in linked_hubs]}" finally: _cleanup_linked_hub(dps_name, dps_rg, device_hostname) + + +def test_linked_hub_create_rejects_duplicate_cross_hostname_type(provisioned_iot_dps_no_hub_module): + """Bug-bash #7: linking the same hub a second time (with different --hostname-type) must fail. + + Step 1: link hub as classic. + Step 2: try to re-link the same hub as device → expect non-zero exit code. + """ + dps_name = provisioned_iot_dps_no_hub_module["name"] + dps_rg = provisioned_iot_dps_no_hub_module["resourceGroup"] + + gwv2_hub = _find_gwv2_hub(dps_rg) + if not gwv2_hub: + pytest.skip("No GWv2 hub available in resource group") + + hub_name = gwv2_hub["name"] + classic_hostname = gwv2_hub["properties"]["hostName"] + device_hostname = gwv2_hub["properties"]["deviceHostName"] + + try: + # Step 1: first link succeeds + cli.invoke( + f"iot dps linked-hub create --dps-name {dps_name} -g {dps_rg} " + f"--hub-name {hub_name} --hostname-type classic" + ) + + # Step 2: re-linking the same hub under a different hostname type must fail. + result = cli.invoke( + f"iot dps linked-hub create --dps-name {dps_name} -g {dps_rg} " + f"--hub-name {hub_name} --hostname-type device" + ) + assert not result.success(), \ + "Re-linking the same hub under a different hostname type should be rejected" + + # Verify only the original classic entry remains. + linked_hubs = cli.invoke( + f"iot dps linked-hub list --dps-name {dps_name} -g {dps_rg}" + ).as_json() + names = [h["name"] for h in linked_hubs] + assert classic_hostname in names, \ + f"Original classic link should still exist. Names: {names}" + assert device_hostname not in names, \ + f"Device-hostname entry should NOT have been created. Names: {names}" + finally: + _cleanup_linked_hub(dps_name, dps_rg, classic_hostname) + _cleanup_linked_hub(dps_name, dps_rg, device_hostname) diff --git a/azext_iot/tests/dps/core/test_dps_linked_hub_unit.py b/azext_iot/tests/dps/core/test_dps_linked_hub_unit.py index 205f00d8c..79133a22b 100644 --- a/azext_iot/tests/dps/core/test_dps_linked_hub_unit.py +++ b/azext_iot/tests/dps/core/test_dps_linked_hub_unit.py @@ -10,7 +10,7 @@ MutuallyExclusiveArgumentError, RequiredArgumentMissingError, ) -from azext_iot.core.custom import _resolve_linked_hub_hostname, _warn_mixed_endpoint_types +from azext_iot.core.custom import _resolve_linked_hub_hostname, _warn_mixed_endpoint_types, _linked_hub_hostname class TestResolveLinkedHubHostname: @@ -124,6 +124,148 @@ def test_mi_null_identity_on_dps(self, fixture_cmd, mock_deps, mocker): ) +class TestLinkedHubCreateDuplicateGuard: + """Bug-bash #7 — reject re-linking the same hub under a different hostname type or auth method.""" + + @pytest.fixture + def mock_deps_factory(self, mocker): + """Returns a factory that builds the mock_deps with a configurable pre-existing iotHubs list.""" + def _factory(existing_hubs, hub_get_response=None): + mocker.patch("azext_iot.core.custom.iot_hub_service_factory") + mocker.patch("azext_iot.core.custom.iot_hub_get", return_value=hub_get_response or { + "properties": {"deviceHostName": "hub.device.azure-devices.net", "hostName": "hub.azure-devices.net"}, + "location": "eastus2euap", + "resourcegroup": "test-rg", + }) + mocker.patch("azext_iot.core.custom.iot_hub_policy_get", return_value={ + "keyName": "iothubowner", "primaryKey": "testkey" + }) + mocker.patch("azext_iot.core.custom._ensure_dps_resource_group_name", return_value="test-rg") + mocker.patch("azext_iot.core.custom.iot_dps_get", return_value={ + "identity": {"type": "SystemAssigned,UserAssigned"}, + "properties": {"iotHubs": list(existing_hubs)}, + }) + mocker.patch("azext_iot.core.custom.LongRunningOperation") + mocker.patch("azext_iot.core.custom.iot_dps_linked_hub_list", return_value=list(existing_hubs)) + mock_client = mocker.MagicMock() + return mock_client + return _factory + + def test_duplicate_rejected_same_hostname_type_mi(self, fixture_cmd, mock_deps_factory): + from azext_iot.core.custom import iot_dps_linked_hub_create + client = mock_deps_factory([{"name": "hub.device.azure-devices.net"}]) + with pytest.raises(InvalidArgumentValueError, match="already linked"): + iot_dps_linked_hub_create( + cmd=fixture_cmd, client=client, dps_name="dps", + hub_name="hub", authentication_type="SystemAssigned" + ) + + def test_duplicate_rejected_cross_hostname_type_classic_then_device(self, fixture_cmd, mock_deps_factory): + """Hub previously linked as classic — re-linking as --hostname-type device must be rejected.""" + from azext_iot.core.custom import iot_dps_linked_hub_create + client = mock_deps_factory([{"name": "hub.azure-devices.net"}]) + with pytest.raises(InvalidArgumentValueError, match="already linked"): + iot_dps_linked_hub_create( + cmd=fixture_cmd, client=client, dps_name="dps", + hub_name="hub", authentication_type="SystemAssigned", + hostname_type="device" + ) + + def test_duplicate_rejected_cross_hostname_type_device_then_classic(self, fixture_cmd, mock_deps_factory): + """Hub previously linked as device — re-linking as classic must be rejected.""" + from azext_iot.core.custom import iot_dps_linked_hub_create + client = mock_deps_factory([{"hostName": "hub.device.azure-devices.net"}]) + with pytest.raises(InvalidArgumentValueError, match="already linked"): + iot_dps_linked_hub_create( + cmd=fixture_cmd, client=client, dps_name="dps", + hub_name="hub", authentication_type="SystemAssigned", + hostname_type="classic" + ) + + def test_duplicate_rejected_cs_then_mi(self, fixture_cmd, mock_deps_factory): + """Hub previously linked via CS — re-linking via MI must be rejected.""" + from azext_iot.core.custom import iot_dps_linked_hub_create + existing = [{ + "connectionString": "HostName=hub.azure-devices.net;SharedAccessKeyName=iothubowner;SharedAccessKey=k", + }] + client = mock_deps_factory(existing) + with pytest.raises(InvalidArgumentValueError, match="already linked"): + iot_dps_linked_hub_create( + cmd=fixture_cmd, client=client, dps_name="dps", + hub_name="hub", authentication_type="SystemAssigned" + ) + + def test_duplicate_rejected_mi_then_cs(self, fixture_cmd, mock_deps_factory, mocker): + """Hub previously linked via MI — re-linking via CS must be rejected.""" + from azext_iot.core.custom import iot_dps_linked_hub_create + mocker.patch("azext_iot.core.custom.iot_hub_get_stats") + client = mock_deps_factory([{"hostName": "hub.device.azure-devices.net"}]) + with pytest.raises(InvalidArgumentValueError, match="already linked"): + iot_dps_linked_hub_create( + cmd=fixture_cmd, client=client, dps_name="dps", + connection_string="HostName=hub.azure-devices.net;SharedAccessKeyName=iothubowner;SharedAccessKey=k", + location="eastus2euap", + ) + + def test_duplicate_rejected_cs_same_hostname(self, fixture_cmd, mock_deps_factory): + """Hub previously linked via CS — re-linking via CS with the same hostname must be rejected.""" + from azext_iot.core.custom import iot_dps_linked_hub_create + existing = [{ + "connectionString": "HostName=hub.azure-devices.net;SharedAccessKeyName=iothubowner;SharedAccessKey=k1", + }] + client = mock_deps_factory(existing) + with pytest.raises(InvalidArgumentValueError, match="already linked"): + iot_dps_linked_hub_create( + cmd=fixture_cmd, client=client, dps_name="dps", + connection_string="HostName=hub.azure-devices.net;SharedAccessKeyName=iothubowner;SharedAccessKey=k2", + location="eastus2euap", + ) + + def test_different_hub_allowed(self, fixture_cmd, mock_deps_factory, mocker): + """Linking a different hub does not collide with an existing link.""" + from azext_iot.core.custom import iot_dps_linked_hub_create + client = mock_deps_factory( + existing_hubs=[{"name": "hub.device.azure-devices.net"}], + hub_get_response={ + "properties": {"deviceHostName": "other.device.azure-devices.net", "hostName": "other.azure-devices.net"}, + "location": "eastus2euap", + "resourcegroup": "test-rg", + }, + ) + # Should not raise the duplicate guard. We don't assert success because the surrounding + # path makes management-plane calls we don't fully mock; we only assert the dup-guard + # error message is NOT raised. + try: + iot_dps_linked_hub_create( + cmd=fixture_cmd, client=client, dps_name="dps", + hub_name="other", authentication_type="SystemAssigned" + ) + except InvalidArgumentValueError as ex: + assert "already linked" not in str(ex) + + def test_empty_existing_allowed(self, fixture_cmd, mock_deps_factory): + """First link onto an empty DPS does not trigger the guard.""" + from azext_iot.core.custom import iot_dps_linked_hub_create + client = mock_deps_factory([]) + try: + iot_dps_linked_hub_create( + cmd=fixture_cmd, client=client, dps_name="dps", + hub_name="hub", authentication_type="SystemAssigned" + ) + except InvalidArgumentValueError as ex: + assert "already linked" not in str(ex) + + def test_dup_guard_short_name_case_insensitive(self, fixture_cmd, mock_deps_factory): + """Short-name comparison is case-insensitive.""" + from azext_iot.core.custom import iot_dps_linked_hub_create + client = mock_deps_factory([{"name": "HUB.azure-devices.net"}]) + with pytest.raises(InvalidArgumentValueError, match="already linked"): + iot_dps_linked_hub_create( + cmd=fixture_cmd, client=client, dps_name="dps", + hub_name="hub", authentication_type="SystemAssigned" + ) + + class TestMixedEndpointWarning: def test_no_warning_all_device(self, caplog): hubs = [ @@ -169,3 +311,37 @@ def test_no_warning_single_hub(self, caplog): def test_no_warning_empty(self, caplog): _warn_mixed_endpoint_types([]) assert "mixed hostname types" not in caplog.text + + +class TestLinkedHubHostname: + """Tests for the _linked_hub_hostname helper used by both the mixed-endpoint warning + and the duplicate-link guard.""" + + def test_hostname_from_hostname_key(self): + assert _linked_hub_hostname({"hostName": "hub.device.azure-devices.net"}) == "hub.device.azure-devices.net" + + def test_hostname_from_connection_string(self): + entry = {"connectionString": "HostName=hub.azure-devices.net;SharedAccessKeyName=k;SharedAccessKey=v"} + assert _linked_hub_hostname(entry) == "hub.azure-devices.net" + + def test_hostname_from_name_key(self): + assert _linked_hub_hostname({"name": "hub.service.azure-devices.net"}) == "hub.service.azure-devices.net" + + def test_hostname_prefers_hostName_over_connection_string(self): + entry = { + "hostName": "primary.device.azure-devices.net", + "connectionString": "HostName=secondary.azure-devices.net;SharedAccessKey=v", + } + assert _linked_hub_hostname(entry) == "primary.device.azure-devices.net" + + def test_hostname_missing_returns_empty(self): + assert _linked_hub_hostname({}) == "" + assert _linked_hub_hostname({"hostName": None, "connectionString": None, "name": None}) == "" + + def test_hostname_cs_with_no_hostname_field(self): + entry = {"connectionString": "SharedAccessKeyName=k;SharedAccessKey=v"} + assert _linked_hub_hostname(entry) == "" + + def test_hostname_cs_case_insensitive(self): + entry = {"connectionString": "hostname=hub.azure-devices.net;SharedAccessKey=v"} + assert _linked_hub_hostname(entry) == "hub.azure-devices.net" diff --git a/azext_iot/tests/iothub/core/test_iothub_utilities_int.py b/azext_iot/tests/iothub/core/test_iothub_utilities_int.py index 927c6eedb..c37384361 100644 --- a/azext_iot/tests/iothub/core/test_iothub_utilities_int.py +++ b/azext_iot/tests/iothub/core/test_iothub_utilities_int.py @@ -67,6 +67,70 @@ def test_iothub_generate_sas_token(self): checks=[self.exists("sas")], ) + def test_iothub_generate_sas_token_hostname_type(self): + """Bug-bash #9: --hostname-type permutations for hub-level SAS. + + Verifies the `sr=` audience in the generated SAS token matches the requested hostname type. + """ + from urllib.parse import unquote + + def extract_sr(sas_payload): + sas = sas_payload["sas"].replace("SharedAccessSignature ", "") + for part in sas.split("&"): + if part.startswith("sr="): + return unquote(part[3:]) + raise AssertionError(f"sas token had no sr= component: {sas}") + + hub = self.cmd( + f"iot hub show -n {self.entity_name} -g {self.entity_rg}" + ).get_output_in_json() + props = hub["properties"] + classic_hn = props["hostName"] + device_hn = props.get("deviceHostName") + service_hn = props.get("serviceHostName") + is_gwv2 = bool(device_hn and service_hn) + + # auto: defaults to service hostname on GWv2, classic on V1 + token = self.cmd( + f"iot hub generate-sas-token -n {self.entity_name} -g {self.entity_rg}", + checks=[self.exists("sas")], + ).get_output_in_json() + expected_auto = service_hn if is_gwv2 else classic_hn + assert extract_sr(token) == expected_auto, \ + f"auto: expected sr={expected_auto}, got {extract_sr(token)}" + + # classic + token = self.cmd( + f"iot hub generate-sas-token -n {self.entity_name} -g {self.entity_rg} --hostname-type classic", + checks=[self.exists("sas")], + ).get_output_in_json() + assert extract_sr(token) == classic_hn + + if is_gwv2: + # service (only valid on GWv2) + token = self.cmd( + f"iot hub generate-sas-token -n {self.entity_name} -g {self.entity_rg} --hostname-type service", + checks=[self.exists("sas")], + ).get_output_in_json() + assert extract_sr(token) == service_hn + + # device (valid for hub-level SAS too — produces device-endpoint audience) + token = self.cmd( + f"iot hub generate-sas-token -n {self.entity_name} -g {self.entity_rg} --hostname-type device", + checks=[self.exists("sas")], + ).get_output_in_json() + assert extract_sr(token) == device_hn + else: + # service / device must error on classic hubs + self.cmd( + f"iot hub generate-sas-token -n {self.entity_name} -g {self.entity_rg} --hostname-type service", + expect_failure=True, + ) + self.cmd( + f"iot hub generate-sas-token -n {self.entity_name} -g {self.entity_rg} --hostname-type device", + expect_failure=True, + ) + def test_iothub_connection_string_show(self): conn_str_pattern = r"^HostName={0}(\.\w+)?\.azure-devices\.net;SharedAccessKeyName=iothubowner;SharedAccessKey=".format( self.entity_name diff --git a/azext_iot/tests/iothub/core/test_iothub_utilities_unit.py b/azext_iot/tests/iothub/core/test_iothub_utilities_unit.py index 1cb3dc307..86e59af45 100644 --- a/azext_iot/tests/iothub/core/test_iothub_utilities_unit.py +++ b/azext_iot/tests/iothub/core/test_iothub_utilities_unit.py @@ -83,3 +83,315 @@ def test_generate_sas_token_from_cs_error(self, mocker, fixture_cmd, req): cmd=fixture_cmd, connection_string=req["connection_string"], ) + + +class TestGenerateSasTokenHostnameType: + """Bug-bash #9 — generate-sas-token --hostname-type plumbing. + + Default behavior shifts on GWv2 hubs: + - Hub-level SAS (no -d): `sr=` uses the service hostname (GWv2) or classic hostname (classic hub). + - Device-level SAS (-d): `sr=` uses the device hostname (GWv2) or classic hostname (classic hub). + - Module-level SAS (-d -m): same as device-level. + Explicit `--hostname-type service` is rejected for device/module scopes. + Explicit `--hostname-type device`/`service` raises on classic hubs. + """ + + GWV2_TARGET = { + "entity": "mygwv2hub.service.azure-devices.net", + "policy": "iothubowner", + "primarykey": "cHJpbWFyeUtleQ==", + "secondarykey": "c2Vjb25kYXJ5S2V5", + "name": "mygwv2hub", + "subscription": "sub", + "resourcegroup": "rg", + "deviceHostName": "mygwv2hub.device.azure-devices.net", + "serviceHostName": "mygwv2hub.service.azure-devices.net", + "location": "eastus2euap", + "cmd": None, + } + CLASSIC_TARGET = { + "entity": "myclassichub.azure-devices.net", + "policy": "iothubowner", + "primarykey": "cHJpbWFyeUtleQ==", + "secondarykey": "c2Vjb25kYXJ5S2V5", + "name": "myclassichub", + "subscription": "sub", + "resourcegroup": "rg", + "location": "eastus", + "cmd": None, + } + DEVICE_ID = "device1" + MODULE_ID = "module1" + DEVICE_KEY = "ZGV2aWNlS2V5MQ==" + MODULE_KEY = "bW9kdWxlS2V5MQ==" + + @pytest.fixture + def patch_discovery(self, mocker): + """Returns a helper that patches IotHubDiscovery.get_target to return the given target.""" + def _patch(target): + mocker.patch( + "azext_iot.operations.hub.IotHubDiscovery.get_target", + return_value=dict(target), + ) + return _patch + + @pytest.fixture + def patch_device_show(self, mocker): + def _patch(): + mocker.patch( + "azext_iot.operations.hub._iot_device_show", + return_value={ + "deviceId": TestGenerateSasTokenHostnameType.DEVICE_ID, + "authentication": { + "type": "sas", + "symmetricKey": { + "primaryKey": TestGenerateSasTokenHostnameType.DEVICE_KEY, + "secondaryKey": "device-secondary", + }, + }, + }, + ) + return _patch + + @pytest.fixture + def patch_module_show(self, mocker): + def _patch(): + mocker.patch( + "azext_iot.operations.hub._iot_device_show", + return_value={ + "deviceId": TestGenerateSasTokenHostnameType.DEVICE_ID, + "authentication": { + "type": "sas", + "symmetricKey": { + "primaryKey": TestGenerateSasTokenHostnameType.DEVICE_KEY, + "secondaryKey": "device-secondary", + }, + }, + }, + ) + mocker.patch( + "azext_iot.operations.hub._iot_device_module_show", + return_value={ + "deviceId": TestGenerateSasTokenHostnameType.DEVICE_ID, + "moduleId": TestGenerateSasTokenHostnameType.MODULE_ID, + "authentication": { + "type": "sas", + "symmetricKey": { + "primaryKey": TestGenerateSasTokenHostnameType.MODULE_KEY, + "secondaryKey": "module-secondary", + }, + }, + }, + ) + return _patch + + @staticmethod + def _extract_sr(sas_token): + """Extract the URL-decoded `sr=` audience from a SAS token string.""" + from urllib.parse import unquote + sas = sas_token["sas"] + # sas tokens look like: "SharedAccessSignature sr=&sig=...&se=...&skn=..." + parts = sas.replace("SharedAccessSignature ", "").split("&") + for part in parts: + if part.startswith("sr="): + return unquote(part[3:]) + raise AssertionError(f"sas token had no sr= component: {sas}") + + # ===== Hub-level SAS (no device) ===== + + def test_hub_sas_auto_on_gwv2_uses_service_endpoint(self, fixture_cmd, patch_discovery): + """Default (auto) on a GWv2 hub for hub-level SAS resolves to the service endpoint.""" + patch_discovery(self.GWV2_TARGET) + token = subject.iot_get_sas_token(cmd=fixture_cmd, hub_name_or_hostname="mygwv2hub") + assert self._extract_sr(token) == "mygwv2hub.service.azure-devices.net" + + def test_hub_sas_explicit_service(self, fixture_cmd, patch_discovery): + patch_discovery(self.GWV2_TARGET) + token = subject.iot_get_sas_token( + cmd=fixture_cmd, hub_name_or_hostname="mygwv2hub", hostname_type="service" + ) + assert self._extract_sr(token) == "mygwv2hub.service.azure-devices.net" + + def test_hub_sas_explicit_device(self, fixture_cmd, patch_discovery): + patch_discovery(self.GWV2_TARGET) + token = subject.iot_get_sas_token( + cmd=fixture_cmd, hub_name_or_hostname="mygwv2hub", hostname_type="device" + ) + assert self._extract_sr(token) == "mygwv2hub.device.azure-devices.net" + + def test_hub_sas_explicit_classic(self, fixture_cmd, patch_discovery): + patch_discovery(self.GWV2_TARGET) + token = subject.iot_get_sas_token( + cmd=fixture_cmd, hub_name_or_hostname="mygwv2hub", hostname_type="classic" + ) + assert self._extract_sr(token) == "mygwv2hub.azure-devices.net" + + def test_hub_sas_auto_on_classic_hub(self, fixture_cmd, patch_discovery): + patch_discovery(self.CLASSIC_TARGET) + token = subject.iot_get_sas_token(cmd=fixture_cmd, hub_name_or_hostname="myclassichub") + assert self._extract_sr(token) == "myclassichub.azure-devices.net" + + def test_hub_sas_explicit_service_on_classic_hub_errors(self, fixture_cmd, patch_discovery): + patch_discovery(self.CLASSIC_TARGET) + with pytest.raises(CLIError, match="not available"): + subject.iot_get_sas_token( + cmd=fixture_cmd, hub_name_or_hostname="myclassichub", hostname_type="service" + ) + + def test_hub_sas_explicit_device_on_classic_hub_errors(self, fixture_cmd, patch_discovery): + patch_discovery(self.CLASSIC_TARGET) + with pytest.raises(CLIError, match="not available"): + subject.iot_get_sas_token( + cmd=fixture_cmd, hub_name_or_hostname="myclassichub", hostname_type="device" + ) + + # ===== Device-level SAS ===== + + def test_device_sas_auto_on_gwv2_uses_device_endpoint( + self, fixture_cmd, patch_discovery, patch_device_show + ): + """Default (auto) on a GWv2 hub for device-level SAS resolves to the device endpoint.""" + patch_discovery(self.GWV2_TARGET) + patch_device_show() + token = subject.iot_get_sas_token( + cmd=fixture_cmd, hub_name_or_hostname="mygwv2hub", device_id=self.DEVICE_ID + ) + assert self._extract_sr(token) == f"mygwv2hub.device.azure-devices.net/devices/{self.DEVICE_ID}" + + def test_device_sas_explicit_device(self, fixture_cmd, patch_discovery, patch_device_show): + patch_discovery(self.GWV2_TARGET) + patch_device_show() + token = subject.iot_get_sas_token( + cmd=fixture_cmd, hub_name_or_hostname="mygwv2hub", + device_id=self.DEVICE_ID, hostname_type="device", + ) + assert self._extract_sr(token) == f"mygwv2hub.device.azure-devices.net/devices/{self.DEVICE_ID}" + + def test_device_sas_explicit_classic(self, fixture_cmd, patch_discovery, patch_device_show): + patch_discovery(self.GWV2_TARGET) + patch_device_show() + token = subject.iot_get_sas_token( + cmd=fixture_cmd, hub_name_or_hostname="mygwv2hub", + device_id=self.DEVICE_ID, hostname_type="classic", + ) + assert self._extract_sr(token) == f"mygwv2hub.azure-devices.net/devices/{self.DEVICE_ID}" + + def test_device_sas_explicit_service_rejected(self, fixture_cmd, patch_discovery): + """Device-scope SAS with --hostname-type service must be rejected up-front.""" + with pytest.raises(CLIError, match="not supported for device or module"): + subject.iot_get_sas_token( + cmd=fixture_cmd, hub_name_or_hostname="mygwv2hub", + device_id=self.DEVICE_ID, hostname_type="service", + ) + + def test_device_sas_auto_on_classic_hub( + self, fixture_cmd, patch_discovery, patch_device_show + ): + patch_discovery(self.CLASSIC_TARGET) + patch_device_show() + token = subject.iot_get_sas_token( + cmd=fixture_cmd, hub_name_or_hostname="myclassichub", device_id=self.DEVICE_ID + ) + assert self._extract_sr(token) == f"myclassichub.azure-devices.net/devices/{self.DEVICE_ID}" + + # ===== Module-level SAS ===== + + def test_module_sas_auto_on_gwv2_uses_device_endpoint( + self, fixture_cmd, patch_discovery, patch_module_show + ): + patch_discovery(self.GWV2_TARGET) + patch_module_show() + token = subject.iot_get_sas_token( + cmd=fixture_cmd, hub_name_or_hostname="mygwv2hub", + device_id=self.DEVICE_ID, module_id=self.MODULE_ID, + ) + expected = f"mygwv2hub.device.azure-devices.net/devices/{self.DEVICE_ID}/modules/{self.MODULE_ID}" + assert self._extract_sr(token) == expected + + def test_module_sas_explicit_service_rejected(self, fixture_cmd): + with pytest.raises(CLIError, match="not supported for device or module"): + subject.iot_get_sas_token( + cmd=fixture_cmd, hub_name_or_hostname="mygwv2hub", + device_id=self.DEVICE_ID, module_id=self.MODULE_ID, + hostname_type="service", + ) + + def test_module_sas_explicit_classic( + self, fixture_cmd, patch_discovery, patch_module_show + ): + patch_discovery(self.GWV2_TARGET) + patch_module_show() + token = subject.iot_get_sas_token( + cmd=fixture_cmd, hub_name_or_hostname="mygwv2hub", + device_id=self.DEVICE_ID, module_id=self.MODULE_ID, + hostname_type="classic", + ) + expected = f"mygwv2hub.azure-devices.net/devices/{self.DEVICE_ID}/modules/{self.MODULE_ID}" + assert self._extract_sr(token) == expected + + # ===== --login mode (CS-based; _transform_hostname path) ===== + + def test_hub_sas_login_mode_classic_string_transformed_to_service( + self, fixture_cmd, patch_discovery + ): + """In --login mode, the target's entity comes from the CS; --hostname-type rewrites it.""" + login_target = { + "entity": "loginhub.azure-devices.net", + "policy": "iothubowner", + "primarykey": "cHJpbWFyeUtleQ==", + "secondarykey": "c2Vjb25kYXJ5S2V5", + "name": "loginhub", + "cmd": None, + } + patch_discovery(login_target) + token = subject.iot_get_sas_token( + cmd=fixture_cmd, login="HostName=loginhub.azure-devices.net;SharedAccessKeyName=iothubowner;SharedAccessKey=k", + hostname_type="service", + ) + assert self._extract_sr(token) == "loginhub.service.azure-devices.net" + + def test_device_sas_login_mode_device_hostname_transform( + self, fixture_cmd, patch_discovery, patch_device_show + ): + login_target = { + "entity": "loginhub.azure-devices.net", + "policy": "iothubowner", + "primarykey": "cHJpbWFyeUtleQ==", + "secondarykey": "c2Vjb25kYXJ5S2V5", + "name": "loginhub", + "cmd": None, + } + patch_discovery(login_target) + patch_device_show() + token = subject.iot_get_sas_token( + cmd=fixture_cmd, login="HostName=loginhub.azure-devices.net;SharedAccessKeyName=iothubowner;SharedAccessKey=k", + device_id=self.DEVICE_ID, hostname_type="device", + ) + assert self._extract_sr(token) == f"loginhub.device.azure-devices.net/devices/{self.DEVICE_ID}" + + +class TestConnectionStringServiceRejection: + """Bug-bash #8 — device/module connection-string commands reject --hostname-type service.""" + + def test_device_cs_service_rejected(self, fixture_cmd): + with pytest.raises(CLIError, match="not supported for device"): + subject.iot_get_device_connection_string( + cmd=fixture_cmd, device_id="d1", hub_name_or_hostname="hub", + hostname_type="service", + ) + + def test_module_cs_service_rejected(self, fixture_cmd): + with pytest.raises(CLIError, match="not supported for module"): + subject.iot_get_module_connection_string( + cmd=fixture_cmd, device_id="d1", module_id="m1", + hub_name_or_hostname="hub", hostname_type="service", + ) + + def test_device_cs_with_login_service_rejected(self, fixture_cmd): + """The guard fires before discovery, regardless of login vs ARM auth.""" + with pytest.raises(CLIError, match="not supported for device"): + subject.iot_get_device_connection_string( + cmd=fixture_cmd, device_id="d1", + login="HostName=hub.azure-devices.net;SharedAccessKeyName=iothubowner;SharedAccessKey=k", + hostname_type="service", + ) diff --git a/azext_iot/tests/iothub/devices/test_iothub_devices_int.py b/azext_iot/tests/iothub/devices/test_iothub_devices_int.py index 3346cfdaf..4d01ffa8d 100644 --- a/azext_iot/tests/iothub/devices/test_iothub_devices_int.py +++ b/azext_iot/tests/iothub/devices/test_iothub_devices_int.py @@ -458,3 +458,97 @@ def test_iothub_device_generate_sas_token(self): f"iot hub generate-sas-token -d {device_ids[0]} --login {mixed_case_cstring}", checks=[self.exists("sas")], ) + + def test_iothub_device_hostname_type_permutations(self): + """Bug-bash #8 and #9: --hostname-type permutations for device-scope CS-show and SAS. + + - `--hostname-type service` must be rejected on both CS-show and SAS. + - Classic / device permutations should succeed (device only on GWv2 hubs). + - On GWv2 hubs, auto (default) resolves to the device endpoint for both CS-show and SAS. + """ + from urllib.parse import unquote + + def extract_sr(sas_payload): + sas = sas_payload["sas"].replace("SharedAccessSignature ", "") + for part in sas.split("&"): + if part.startswith("sr="): + return unquote(part[3:]) + raise AssertionError(f"sas token had no sr= component: {sas}") + + hub = self.cmd( + f"iot hub show -n {self.entity_name} -g {self.entity_rg}" + ).get_output_in_json() + props = hub["properties"] + classic_hn = props["hostName"] + device_hn = props.get("deviceHostName") + is_gwv2 = bool(device_hn) + + device_id = self.generate_device_names(1)[0] + self.cmd( + f"iot hub device-identity create -d {device_id} -n {self.entity_name} -g {self.entity_rg}" + ) + + # --- device-identity connection-string show --- + # service is always rejected (bug #8) + self.cmd( + f"iot hub device-identity connection-string show -d {device_id} " + f"-n {self.host_name} -g {self.entity_rg} --hostname-type service", + expect_failure=True, + ) + + # classic + cs_classic = self.cmd( + f"iot hub device-identity connection-string show -d {device_id} " + f"-n {self.host_name} -g {self.entity_rg} --hostname-type classic", + ).get_output_in_json()["connectionString"] + assert f"HostName={classic_hn}" in cs_classic + + if is_gwv2: + # device + cs_device = self.cmd( + f"iot hub device-identity connection-string show -d {device_id} " + f"-n {self.host_name} -g {self.entity_rg} --hostname-type device", + ).get_output_in_json()["connectionString"] + assert f"HostName={device_hn}" in cs_device + + # auto defaults to device + cs_auto = self.cmd( + f"iot hub device-identity connection-string show -d {device_id} " + f"-n {self.host_name} -g {self.entity_rg}", + ).get_output_in_json()["connectionString"] + assert f"HostName={device_hn}" in cs_auto + + # --- generate-sas-token (device scope) --- + # service is always rejected (bug #9) + self.cmd( + f"iot hub generate-sas-token -d {device_id} -n {self.host_name} -g {self.entity_rg} --hostname-type service", + expect_failure=True, + ) + + # classic + token = self.cmd( + f"iot hub generate-sas-token -d {device_id} -n {self.host_name} -g {self.entity_rg} --hostname-type classic", + checks=[self.exists("sas")], + ).get_output_in_json() + assert extract_sr(token) == f"{classic_hn}/devices/{device_id}" + + if is_gwv2: + # device + token = self.cmd( + f"iot hub generate-sas-token -d {device_id} -n {self.host_name} -g {self.entity_rg} --hostname-type device", + checks=[self.exists("sas")], + ).get_output_in_json() + assert extract_sr(token) == f"{device_hn}/devices/{device_id}" + + # auto on GWv2 defaults to device + token = self.cmd( + f"iot hub generate-sas-token -d {device_id} -n {self.host_name} -g {self.entity_rg}", + checks=[self.exists("sas")], + ).get_output_in_json() + assert extract_sr(token) == f"{device_hn}/devices/{device_id}" + else: + # device must error on classic hub + self.cmd( + f"iot hub generate-sas-token -d {device_id} -n {self.host_name} -g {self.entity_rg} --hostname-type device", + expect_failure=True, + ) diff --git a/azext_iot/tests/iothub/modules/test_iothub_modules_int.py b/azext_iot/tests/iothub/modules/test_iothub_modules_int.py index b6d8b09f7..f841626ae 100644 --- a/azext_iot/tests/iothub/modules/test_iothub_modules_int.py +++ b/azext_iot/tests/iothub/modules/test_iothub_modules_int.py @@ -439,3 +439,100 @@ def test_iothub_module_generate_sas_token(self): f"iot hub generate-sas-token -m {module_ids[0]} -d {device_ids[0]} --login {mixed_case_cstring}", checks=[self.exists("sas")], ) + + def test_iothub_module_hostname_type_permutations(self): + """Bug-bash #8 and #9: --hostname-type permutations for module-scope CS-show and SAS. + + - `--hostname-type service` must be rejected on both CS-show and SAS. + - Classic / device permutations should succeed (device only on GWv2 hubs). + - On GWv2 hubs, auto (default) resolves to the device endpoint for both CS-show and SAS. + """ + from urllib.parse import unquote + + def extract_sr(sas_payload): + sas = sas_payload["sas"].replace("SharedAccessSignature ", "") + for part in sas.split("&"): + if part.startswith("sr="): + return unquote(part[3:]) + raise AssertionError(f"sas token had no sr= component: {sas}") + + hub = self.cmd( + f"iot hub show -n {self.entity_name} -g {self.entity_rg}" + ).get_output_in_json() + props = hub["properties"] + classic_hn = props["hostName"] + device_hn = props.get("deviceHostName") + is_gwv2 = bool(device_hn) + + device_id = self.generate_device_names(1)[0] + module_id = self.generate_device_names(1)[0] + self.cmd( + f"iot hub device-identity create -d {device_id} -n {self.entity_name} -g {self.entity_rg}" + ) + self.cmd( + f"iot hub module-identity create -m {module_id} -d {device_id} -n {self.entity_name} -g {self.entity_rg}" + ) + + # --- module-identity connection-string show --- + # service is always rejected (bug #8) + self.cmd( + f"iot hub module-identity connection-string show -m {module_id} -d {device_id} " + f"-n {self.host_name} -g {self.entity_rg} --hostname-type service", + expect_failure=True, + ) + + # classic + cs_classic = self.cmd( + f"iot hub module-identity connection-string show -m {module_id} -d {device_id} " + f"-n {self.host_name} -g {self.entity_rg} --hostname-type classic", + ).get_output_in_json()["connectionString"] + assert f"HostName={classic_hn}" in cs_classic + + if is_gwv2: + cs_device = self.cmd( + f"iot hub module-identity connection-string show -m {module_id} -d {device_id} " + f"-n {self.host_name} -g {self.entity_rg} --hostname-type device", + ).get_output_in_json()["connectionString"] + assert f"HostName={device_hn}" in cs_device + + cs_auto = self.cmd( + f"iot hub module-identity connection-string show -m {module_id} -d {device_id} " + f"-n {self.host_name} -g {self.entity_rg}", + ).get_output_in_json()["connectionString"] + assert f"HostName={device_hn}" in cs_auto + + # --- generate-sas-token (module scope) --- + # service is always rejected (bug #9) + self.cmd( + f"iot hub generate-sas-token -m {module_id} -d {device_id} " + f"-n {self.host_name} -g {self.entity_rg} --hostname-type service", + expect_failure=True, + ) + + token = self.cmd( + f"iot hub generate-sas-token -m {module_id} -d {device_id} " + f"-n {self.host_name} -g {self.entity_rg} --hostname-type classic", + checks=[self.exists("sas")], + ).get_output_in_json() + assert extract_sr(token) == f"{classic_hn}/devices/{device_id}/modules/{module_id}" + + if is_gwv2: + token = self.cmd( + f"iot hub generate-sas-token -m {module_id} -d {device_id} " + f"-n {self.host_name} -g {self.entity_rg} --hostname-type device", + checks=[self.exists("sas")], + ).get_output_in_json() + assert extract_sr(token) == f"{device_hn}/devices/{device_id}/modules/{module_id}" + + token = self.cmd( + f"iot hub generate-sas-token -m {module_id} -d {device_id} " + f"-n {self.host_name} -g {self.entity_rg}", + checks=[self.exists("sas")], + ).get_output_in_json() + assert extract_sr(token) == f"{device_hn}/devices/{device_id}/modules/{module_id}" + else: + self.cmd( + f"iot hub generate-sas-token -m {module_id} -d {device_id} " + f"-n {self.host_name} -g {self.entity_rg} --hostname-type device", + expect_failure=True, + )