diff --git a/HISTORY.rst b/HISTORY.rst index 3debca45e..d02168878 100644 --- a/HISTORY.rst +++ b/HISTORY.rst @@ -3,6 +3,19 @@ Release History =============== +0.31.0b2 (Preview) ++++++++++++++++ + +**Bug fixes** + +* ``az iot hub device-identity connection-string show`` and ``az iot hub module-identity connection-string show`` now reject ``--hostname-type service``. Use ``auto``, ``device`` or ``classic`` instead. + +* ``az iot hub generate-sas-token`` gains a ``--hostname-type`` parameter to control the SAS token audience: + + - Hub-level (no ``-d``) defaults to the service hostname. + - Device-level (``-d``) and module-level (``-d -m``) default to the device hostname. + - ``service``, ``device`` and ``classic`` are explicitly selectable. + 0.31.0b1 (Preview) +++++++++++++++ diff --git a/azext_iot/_help.py b/azext_iot/_help.py index d730fb43e..e25f7ad4a 100644 --- a/azext_iot/_help.py +++ b/azext_iot/_help.py @@ -689,6 +689,12 @@ text: > az iot hub generate-sas-token --connection-string 'HostName=myhub.azure-devices.net;DeviceId=mydevice;ModuleId=mymodule;SharedAccessKeyName=iothubowner;SharedAccessKey=12345' + - name: Generate a device SAS token whose audience targets the device endpoint. + text: > + az iot hub generate-sas-token -d {device_id} -n {iothub_name} --hostname-type device + - name: Generate an IoT Hub SAS token whose audience explicitly targets the service endpoint. + text: > + az iot hub generate-sas-token -n {iothub_name} --hostname-type service """ helps[ diff --git a/azext_iot/_params.py b/azext_iot/_params.py index 4374780b9..0153aec77 100644 --- a/azext_iot/_params.py +++ b/azext_iot/_params.py @@ -380,6 +380,23 @@ def load_arguments(self, _): "'service' uses the TLS 1.3 service hostname (errors if not GWv2).", ) + with self.argument_context("iot hub generate-sas-token") as context: + context.argument( + "hostname_type", + options_list=["--hostname-type", "--ht"], + arg_type=get_enum_type(HostnameType), + default=HostnameType.AUTO.value, + help="Type of hostname to use as the SAS token audience. " + "'auto' uses the device hostname for device/module-scoped tokens " + "and the service hostname for hub-scoped tokens. " + "'classic' always uses the default hostname. " + "'device' uses the device hostname (errors on non-GWv2 hubs). " + "'service' uses the service hostname (errors on non-GWv2 hubs). " + "This option cannot be combined with --connection-string; when " + "--connection-string is supplied, the SAS audience is derived " + "from its HostName.", + ) + with self.argument_context("iot hub job") as context: context.argument("job_id", options_list=["--job-id"], help="IoT Hub job Id.") context.argument( diff --git a/azext_iot/constants.py b/azext_iot/constants.py index 4a0a8a603..0b651642a 100644 --- a/azext_iot/constants.py +++ b/azext_iot/constants.py @@ -7,7 +7,7 @@ import os -VERSION = "0.31.0b1" +VERSION = "0.31.0b2" EXTENSION_NAME = "azure-iot" EXTENSION_ROOT = os.path.dirname(os.path.abspath(__file__)) EXTENSION_CONFIG_ROOT_KEY = "iotext" diff --git a/azext_iot/operations/hub.py b/azext_iot/operations/hub.py index 6b2962ee5..3863d94b7 100644 --- a/azext_iot/operations/hub.py +++ b/azext_iot/operations/hub.py @@ -2240,22 +2240,20 @@ def iot_get_sas_token( module_id=None, auth_type_dataplane=None, connection_string=None, + hostname_type=HostnameType.AUTO.value, ): key_type = key_type.lower() policy_name = policy_name.lower() - if login and policy_name != "iothubowner": - raise ArgumentUsageError( - "You are unable to change the sas policy with a hub connection string login." - ) - if login and key_type != "primary" and not device_id: - raise ArgumentUsageError( - "For non-device sas, you are unable to change the key type with a connection string login." - ) - if module_id and not device_id: - raise ArgumentUsageError( - "You are unable to get sas token for module without device information." - ) + _validate_iot_get_sas_token_args( + login=login, + policy_name=policy_name, + key_type=key_type, + device_id=device_id, + module_id=module_id, + connection_string=connection_string, + hostname_type=hostname_type, + ) if connection_string: return { @@ -2277,10 +2275,33 @@ def iot_get_sas_token( resource_group_name, login, auth_type_dataplane, + hostname_type, ).generate_sas_token() } +def _validate_iot_get_sas_token_args( + login, policy_name, key_type, device_id, module_id, connection_string, hostname_type +): + if login and policy_name != "iothubowner": + raise ArgumentUsageError( + "You are unable to change the sas policy with a hub connection string login." + ) + if login and key_type != "primary" and not device_id: + raise ArgumentUsageError( + "For non-device sas, you are unable to change the key type with a connection string login." + ) + if module_id and not device_id: + raise ArgumentUsageError( + "You are unable to get sas token for module without device information." + ) + if connection_string and hostname_type != HostnameType.AUTO.value: + raise ArgumentUsageError( + "--hostname-type is not supported with --connection-string. " + "The SAS audience is derived from the HostName in the supplied connection string." + ) + + def _iot_build_sas_token_from_cs(connection_string, duration=3600): uri = None policy = None @@ -2330,6 +2351,7 @@ def _iot_build_sas_token( resource_group_name=None, login=None, auth_type_dataplane=None, + hostname_type=HostnameType.AUTO.value, ): from azext_iot.common._azure import ( parse_iot_device_connection_string, @@ -2352,6 +2374,10 @@ def _iot_build_sas_token( policy = None key = None + resolved_host = _resolve_sas_audience( + target, hostname_type, device_id=device_id, login=login + ) + if device_id: logger.info( 'Obtaining device "%s" details from registry, using IoT Hub policy "%s"', @@ -2365,7 +2391,7 @@ def _iot_build_sas_token( entity=module, key_type=key_type ) uri = "{}/devices/{}/modules/{}".format( - target["entity"], device_id, module_id + resolved_host, device_id, module_id ) try: parsed_module_cs = parse_iot_device_module_connection_string(module_cs) @@ -2378,7 +2404,7 @@ def _iot_build_sas_token( device_cs = _build_device_or_module_connection_string( entity=device, key_type=key_type ) - uri = "{}/devices/{}".format(target["entity"], device_id) + uri = "{}/devices/{}".format(resolved_host, device_id) try: parsed_device_cs = parse_iot_device_connection_string(device_cs) except ValueError as e: @@ -2387,7 +2413,7 @@ def _iot_build_sas_token( key = parsed_device_cs["SharedAccessKey"] else: - uri = target["entity"] + uri = resolved_host policy = target["policy"] key = target["primarykey"] if key_type == "primary" else target["secondarykey"] @@ -2410,6 +2436,25 @@ def _transform_hostname(hostname, hostname_type): return hostname_map.get(hostname_type, hostname) +def _resolve_sas_audience(target, hostname_type, device_id=None, login=None): + """Resolve the SAS audience host. + + Login mode lacks ARM metadata, so the host is string-transformed from the + CS HostName. + """ + auto_tls_key = "deviceHostName" if device_id else "serviceHostName" + if login: + effective_type = hostname_type + if effective_type == HostnameType.AUTO.value: + effective_type = ( + HostnameType.DEVICE.value if device_id else HostnameType.SERVICE.value + ) + return _transform_hostname(target["entity"], effective_type) + return _resolve_hostname_by_type( + target, hostname_type, auto_tls_key=auto_tls_key + ) + + def _resolve_hostname_by_type(target, hostname_type, auto_tls_key="deviceHostName"): classic = _transform_hostname(target["entity"], HostnameType.CLASSIC.value) if hostname_type == HostnameType.CLASSIC.value: @@ -2468,6 +2513,11 @@ def iot_get_device_connection_string( auth_type_dataplane=None, hostname_type=HostnameType.AUTO.value, ): + if hostname_type == HostnameType.SERVICE.value: + raise InvalidArgumentValueError( + "Hostname type 'service' is not supported for device connection strings. " + "Use 'auto', 'device', or 'classic' instead." + ) result = {} discovery = IotHubDiscovery(cmd) target = discovery.get_target( @@ -2498,6 +2548,11 @@ def iot_get_module_connection_string( auth_type_dataplane=None, hostname_type=HostnameType.AUTO.value, ): + if hostname_type == HostnameType.SERVICE.value: + raise InvalidArgumentValueError( + "Hostname type 'service' is not supported for module connection strings. " + "Use 'auto', 'device', or 'classic' instead." + ) result = {} discovery = IotHubDiscovery(cmd) target = discovery.get_target( diff --git a/azext_iot/tests/iothub/core/test_iothub_utilities_int.py b/azext_iot/tests/iothub/core/test_iothub_utilities_int.py index 927c6eedb..c59625fa2 100644 --- a/azext_iot/tests/iothub/core/test_iothub_utilities_int.py +++ b/azext_iot/tests/iothub/core/test_iothub_utilities_int.py @@ -67,6 +67,70 @@ def test_iothub_generate_sas_token(self): checks=[self.exists("sas")], ) + def test_iothub_generate_sas_token_hostname_type(self): + """--hostname-type permutations for hub-level SAS. + + Verifies the `sr=` audience in the generated SAS token matches the requested hostname type. + """ + from urllib.parse import unquote + + def extract_sr(sas_payload): + sas = sas_payload["sas"].replace("SharedAccessSignature ", "") + for part in sas.split("&"): + if part.startswith("sr="): + return unquote(part[3:]) + raise AssertionError(f"sas token had no sr= component: {sas}") + + hub = self.cmd( + f"iot hub show -n {self.entity_name} -g {self.entity_rg}" + ).get_output_in_json() + props = hub["properties"] + classic_hn = props["hostName"] + device_hn = props.get("deviceHostName") + service_hn = props.get("serviceHostName") + is_gwv2 = bool(device_hn and service_hn) + + # auto: defaults to service hostname on GWv2, classic on V1 + token = self.cmd( + f"iot hub generate-sas-token -n {self.entity_name} -g {self.entity_rg}", + checks=[self.exists("sas")], + ).get_output_in_json() + expected_auto = service_hn if is_gwv2 else classic_hn + assert extract_sr(token) == expected_auto, \ + f"auto: expected sr={expected_auto}, got {extract_sr(token)}" + + # classic + token = self.cmd( + f"iot hub generate-sas-token -n {self.entity_name} -g {self.entity_rg} --hostname-type classic", + checks=[self.exists("sas")], + ).get_output_in_json() + assert extract_sr(token) == classic_hn + + if is_gwv2: + # service + token = self.cmd( + f"iot hub generate-sas-token -n {self.entity_name} -g {self.entity_rg} --hostname-type service", + checks=[self.exists("sas")], + ).get_output_in_json() + assert extract_sr(token) == service_hn + + # device + token = self.cmd( + f"iot hub generate-sas-token -n {self.entity_name} -g {self.entity_rg} --hostname-type device", + checks=[self.exists("sas")], + ).get_output_in_json() + assert extract_sr(token) == device_hn + else: + # service / device must error on classic hubs + self.cmd( + f"iot hub generate-sas-token -n {self.entity_name} -g {self.entity_rg} --hostname-type service", + expect_failure=True, + ) + self.cmd( + f"iot hub generate-sas-token -n {self.entity_name} -g {self.entity_rg} --hostname-type device", + expect_failure=True, + ) + def test_iothub_connection_string_show(self): conn_str_pattern = r"^HostName={0}(\.\w+)?\.azure-devices\.net;SharedAccessKeyName=iothubowner;SharedAccessKey=".format( self.entity_name diff --git a/azext_iot/tests/iothub/core/test_iothub_utilities_unit.py b/azext_iot/tests/iothub/core/test_iothub_utilities_unit.py index 1cb3dc307..13550207f 100644 --- a/azext_iot/tests/iothub/core/test_iothub_utilities_unit.py +++ b/azext_iot/tests/iothub/core/test_iothub_utilities_unit.py @@ -83,3 +83,123 @@ def test_generate_sas_token_from_cs_error(self, mocker, fixture_cmd, req): cmd=fixture_cmd, connection_string=req["connection_string"], ) + + +class TestHostnameTypeRouting: + """SAS audience routing + CS-show service-hostname rejection.""" + + HUB = "mygwv2hub" + _PRIMARY = generate_generic_id() + _SECONDARY = generate_generic_id() + _DEVICE_PRIMARY = generate_generic_id() + _DEVICE_SECONDARY = generate_generic_id() + TARGET = { + "entity": f"{HUB}.service.azure-devices.net", + "policy": "iothubowner", + "primarykey": _PRIMARY, + "secondarykey": _SECONDARY, + "name": HUB, "subscription": "sub", "resourcegroup": "rg", + "deviceHostName": f"{HUB}.device.azure-devices.net", + "serviceHostName": f"{HUB}.service.azure-devices.net", + "cmd": None, + } + DEVICE = { + "deviceId": "d1", + "authentication": {"type": "sas", "symmetricKey": { + "primaryKey": _DEVICE_PRIMARY, "secondaryKey": _DEVICE_SECONDARY}}, + } + MODULE = {**DEVICE, "moduleId": "m1"} + + @pytest.fixture(autouse=True) + def _patches(self, mocker): + mocker.patch("azext_iot.operations.hub.IotHubDiscovery.get_target", + return_value=dict(self.TARGET)) + mocker.patch("azext_iot.operations.hub._iot_device_show", return_value=self.DEVICE) + mocker.patch("azext_iot.operations.hub._iot_device_module_show", return_value=self.MODULE) + + @staticmethod + def _sr(token): + from urllib.parse import unquote + for part in token["sas"].replace("SharedAccessSignature ", "").split("&"): + if part.startswith("sr="): + return unquote(part[3:]) + raise AssertionError(token["sas"]) + + @pytest.mark.parametrize("scope, hostname_type, expected", [ + # defaults (auto) on GWv2: hub->service, device/module->device + ({}, "auto", "mygwv2hub.service.azure-devices.net"), + ({"device_id": "d1"}, "auto", "mygwv2hub.device.azure-devices.net/devices/d1"), + ({"device_id": "d1", "module_id": "m1"}, "auto", + "mygwv2hub.device.azure-devices.net/devices/d1/modules/m1"), + + ({"device_id": "d1"}, "service", "mygwv2hub.service.azure-devices.net/devices/d1"), + ({"device_id": "d1", "module_id": "m1"}, "service", + "mygwv2hub.service.azure-devices.net/devices/d1/modules/m1"), + ]) + def test_sas_audience(self, fixture_cmd, scope, hostname_type, expected): + token = subject.iot_get_sas_token( + cmd=fixture_cmd, hub_name_or_hostname=self.HUB, + hostname_type=hostname_type, **scope) + assert self._sr(token) == expected + + @pytest.mark.parametrize("op, kwargs", [ + (subject.iot_get_device_connection_string, + {"hub_name_or_hostname": "hub", "device_id": "d1"}), + (subject.iot_get_module_connection_string, + {"hub_name_or_hostname": "hub", "device_id": "d1", "module_id": "m1"}), + ]) + def test_cs_show_rejects_service_hostname(self, fixture_cmd, op, kwargs): + with pytest.raises(CLIError, match="not supported"): + op(cmd=fixture_cmd, hostname_type="service", **kwargs) + + @pytest.mark.parametrize("hostname_type", ["device", "service", "classic"]) + def test_sas_connection_string_rejects_explicit_hostname_type( + self, fixture_cmd, hostname_type + ): + cs = generate_valid_cs(["DeviceId"])["connection_string"] + with pytest.raises(CLIError, match="--connection-string"): + subject.iot_get_sas_token( + cmd=fixture_cmd, + connection_string=cs, + hostname_type=hostname_type, + ) + + @pytest.mark.parametrize("scope, hostname_type, login_host, expected", [ + # login (offline) mode: audience comes from string-transformed CS HostName. + # auto defaults to scope-appropriate hostname. + ({}, "auto", "mygwv2hub.service.azure-devices.net", + "mygwv2hub.service.azure-devices.net"), + ({}, "auto", "mygwv2hub.azure-devices.net", + "mygwv2hub.service.azure-devices.net"), + ({"device_id": "d1"}, "auto", "mygwv2hub.service.azure-devices.net", + "mygwv2hub.device.azure-devices.net/devices/d1"), + ({"device_id": "d1"}, "auto", "mygwv2hub.azure-devices.net", + "mygwv2hub.device.azure-devices.net/devices/d1"), + ({"device_id": "d1", "module_id": "m1"}, "auto", + "mygwv2hub.service.azure-devices.net", + "mygwv2hub.device.azure-devices.net/devices/d1/modules/m1"), + + ({"device_id": "d1"}, "device", "mygwv2hub.azure-devices.net", + "mygwv2hub.device.azure-devices.net/devices/d1"), + ({"device_id": "d1"}, "classic", "mygwv2hub.service.azure-devices.net", + "mygwv2hub.azure-devices.net/devices/d1"), + ({"device_id": "d1", "module_id": "m1"}, "service", + "mygwv2hub.device.azure-devices.net", + "mygwv2hub.service.azure-devices.net/devices/d1/modules/m1"), + ]) + def test_sas_audience_login_mode( + self, mocker, fixture_cmd, scope, hostname_type, login_host, expected + ): + key = generate_generic_id() + target = dict(self.TARGET, entity=login_host, + cs=(f"HostName={login_host};SharedAccessKeyName=iothubowner;" + f"SharedAccessKey={key}")) + mocker.patch("azext_iot.operations.hub.IotHubDiscovery.get_target", + return_value=target) + token = subject.iot_get_sas_token( + cmd=fixture_cmd, + login=target["cs"], + hostname_type=hostname_type, + **scope, + ) + assert self._sr(token) == expected diff --git a/azext_iot/tests/iothub/devices/test_iothub_devices_int.py b/azext_iot/tests/iothub/devices/test_iothub_devices_int.py index 3346cfdaf..4876f27f6 100644 --- a/azext_iot/tests/iothub/devices/test_iothub_devices_int.py +++ b/azext_iot/tests/iothub/devices/test_iothub_devices_int.py @@ -458,3 +458,102 @@ def test_iothub_device_generate_sas_token(self): f"iot hub generate-sas-token -d {device_ids[0]} --login {mixed_case_cstring}", checks=[self.exists("sas")], ) + + def test_iothub_device_hostname_type_permutations(self): + """--hostname-type permutations for device-scope CS-show and SAS. + + - `--hostname-type service` is rejected + - `--hostname-type service` is accepted on SAS (caller opts in with --hostname-type). + """ + from urllib.parse import unquote + + def extract_sr(sas_payload): + sas = sas_payload["sas"].replace("SharedAccessSignature ", "") + for part in sas.split("&"): + if part.startswith("sr="): + return unquote(part[3:]) + raise AssertionError(f"sas token had no sr= component: {sas}") + + hub = self.cmd( + f"iot hub show -n {self.entity_name} -g {self.entity_rg}" + ).get_output_in_json() + props = hub["properties"] + classic_hn = props["hostName"] + device_hn = props.get("deviceHostName") + service_hn = props.get("serviceHostName") + is_gwv2 = bool(device_hn) + + device_id = self.generate_device_names(1)[0] + self.cmd( + f"iot hub device-identity create -d {device_id} -n {self.entity_name} -g {self.entity_rg}" + ) + + # device-identity connection-string show + + self.cmd( + f"iot hub device-identity connection-string show -d {device_id} " + f"-n {self.host_name} -g {self.entity_rg} --hostname-type service", + expect_failure=True, + ) + + # classic + cs_classic = self.cmd( + f"iot hub device-identity connection-string show -d {device_id} " + f"-n {self.host_name} -g {self.entity_rg} --hostname-type classic", + ).get_output_in_json()["connectionString"] + assert f"HostName={classic_hn}" in cs_classic + + if is_gwv2: + # device + cs_device = self.cmd( + f"iot hub device-identity connection-string show -d {device_id} " + f"-n {self.host_name} -g {self.entity_rg} --hostname-type device", + ).get_output_in_json()["connectionString"] + assert f"HostName={device_hn}" in cs_device + + # auto defaults to device + cs_auto = self.cmd( + f"iot hub device-identity connection-string show -d {device_id} " + f"-n {self.host_name} -g {self.entity_rg}", + ).get_output_in_json()["connectionString"] + assert f"HostName={device_hn}" in cs_auto + + # generate-sas-token (device scope) + # classic + token = self.cmd( + f"iot hub generate-sas-token -d {device_id} -n {self.host_name} -g {self.entity_rg} --hostname-type classic", + checks=[self.exists("sas")], + ).get_output_in_json() + assert extract_sr(token) == f"{classic_hn}/devices/{device_id}" + + if is_gwv2: + # device + token = self.cmd( + f"iot hub generate-sas-token -d {device_id} -n {self.host_name} -g {self.entity_rg} --hostname-type device", + checks=[self.exists("sas")], + ).get_output_in_json() + assert extract_sr(token) == f"{device_hn}/devices/{device_id}" + + # service is selectable for device scope + token = self.cmd( + f"iot hub generate-sas-token -d {device_id} -n {self.host_name} -g {self.entity_rg} --hostname-type service", + checks=[self.exists("sas")], + ).get_output_in_json() + assert extract_sr(token) == f"{service_hn}/devices/{device_id}" + + # auto on GWv2 defaults to device + token = self.cmd( + f"iot hub generate-sas-token -d {device_id} -n {self.host_name} -g {self.entity_rg}", + checks=[self.exists("sas")], + ).get_output_in_json() + assert extract_sr(token) == f"{device_hn}/devices/{device_id}" + else: + # device/service hostname types must error on classic hubs (no hostname to resolve) + self.cmd( + f"iot hub generate-sas-token -d {device_id} -n {self.host_name} -g {self.entity_rg} --hostname-type device", + expect_failure=True, + ) + self.cmd( + f"iot hub generate-sas-token -d {device_id} -n {self.host_name} -g {self.entity_rg} --hostname-type service", + expect_failure=True, + ) diff --git a/azext_iot/tests/iothub/modules/test_iothub_modules_int.py b/azext_iot/tests/iothub/modules/test_iothub_modules_int.py index b6d8b09f7..03fad2fe5 100644 --- a/azext_iot/tests/iothub/modules/test_iothub_modules_int.py +++ b/azext_iot/tests/iothub/modules/test_iothub_modules_int.py @@ -439,3 +439,107 @@ def test_iothub_module_generate_sas_token(self): f"iot hub generate-sas-token -m {module_ids[0]} -d {device_ids[0]} --login {mixed_case_cstring}", checks=[self.exists("sas")], ) + + def test_iothub_module_hostname_type_permutations(self): + """--hostname-type permutations for module-scope CS-show and SAS. + + - `--hostname-type service` is rejected on CS-show + - `--hostname-type service` is accepted on SAS (caller opts in; audience targets service endpoint). + - On GWv2 hubs, auto (default) resolves to the device endpoint for both CS-show and SAS. + """ + from urllib.parse import unquote + + def extract_sr(sas_payload): + sas = sas_payload["sas"].replace("SharedAccessSignature ", "") + for part in sas.split("&"): + if part.startswith("sr="): + return unquote(part[3:]) + raise AssertionError(f"sas token had no sr= component: {sas}") + + hub = self.cmd( + f"iot hub show -n {self.entity_name} -g {self.entity_rg}" + ).get_output_in_json() + props = hub["properties"] + classic_hn = props["hostName"] + device_hn = props.get("deviceHostName") + service_hn = props.get("serviceHostName") + is_gwv2 = bool(device_hn) + + device_id = self.generate_device_names(1)[0] + module_id = self.generate_device_names(1)[0] + self.cmd( + f"iot hub device-identity create -d {device_id} -n {self.entity_name} -g {self.entity_rg}" + ) + self.cmd( + f"iot hub module-identity create -m {module_id} -d {device_id} -n {self.entity_name} -g {self.entity_rg}" + ) + + # module-identity connection-string show + + self.cmd( + f"iot hub module-identity connection-string show -m {module_id} -d {device_id} " + f"-n {self.host_name} -g {self.entity_rg} --hostname-type service", + expect_failure=True, + ) + + # classic + cs_classic = self.cmd( + f"iot hub module-identity connection-string show -m {module_id} -d {device_id} " + f"-n {self.host_name} -g {self.entity_rg} --hostname-type classic", + ).get_output_in_json()["connectionString"] + assert f"HostName={classic_hn}" in cs_classic + + if is_gwv2: + cs_device = self.cmd( + f"iot hub module-identity connection-string show -m {module_id} -d {device_id} " + f"-n {self.host_name} -g {self.entity_rg} --hostname-type device", + ).get_output_in_json()["connectionString"] + assert f"HostName={device_hn}" in cs_device + + cs_auto = self.cmd( + f"iot hub module-identity connection-string show -m {module_id} -d {device_id} " + f"-n {self.host_name} -g {self.entity_rg}", + ).get_output_in_json()["connectionString"] + assert f"HostName={device_hn}" in cs_auto + + # generate-sas-token (module scope) + token = self.cmd( + f"iot hub generate-sas-token -m {module_id} -d {device_id} " + f"-n {self.host_name} -g {self.entity_rg} --hostname-type classic", + checks=[self.exists("sas")], + ).get_output_in_json() + assert extract_sr(token) == f"{classic_hn}/devices/{device_id}/modules/{module_id}" + + if is_gwv2: + token = self.cmd( + f"iot hub generate-sas-token -m {module_id} -d {device_id} " + f"-n {self.host_name} -g {self.entity_rg} --hostname-type device", + checks=[self.exists("sas")], + ).get_output_in_json() + assert extract_sr(token) == f"{device_hn}/devices/{device_id}/modules/{module_id}" + + # service is selectable for module scope + token = self.cmd( + f"iot hub generate-sas-token -m {module_id} -d {device_id} " + f"-n {self.host_name} -g {self.entity_rg} --hostname-type service", + checks=[self.exists("sas")], + ).get_output_in_json() + assert extract_sr(token) == f"{service_hn}/devices/{device_id}/modules/{module_id}" + + token = self.cmd( + f"iot hub generate-sas-token -m {module_id} -d {device_id} " + f"-n {self.host_name} -g {self.entity_rg}", + checks=[self.exists("sas")], + ).get_output_in_json() + assert extract_sr(token) == f"{device_hn}/devices/{device_id}/modules/{module_id}" + else: + self.cmd( + f"iot hub generate-sas-token -m {module_id} -d {device_id} " + f"-n {self.host_name} -g {self.entity_rg} --hostname-type device", + expect_failure=True, + ) + self.cmd( + f"iot hub generate-sas-token -m {module_id} -d {device_id} " + f"-n {self.host_name} -g {self.entity_rg} --hostname-type service", + expect_failure=True, + )