Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions HISTORY.rst
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,19 @@
Release History
===============

0.31.0b2 (Preview)
+++++++++++++++

**Bug fixes**

* ``az iot hub device-identity connection-string show`` and ``az iot hub module-identity connection-string show`` now reject ``--hostname-type service``. Use ``auto``, ``device`` or ``classic`` instead.

* ``az iot hub generate-sas-token`` gains a ``--hostname-type`` parameter to control the SAS token audience:

- Hub-level (no ``-d``) defaults to the service hostname.
- Device-level (``-d``) and module-level (``-d -m``) default to the device hostname.
- ``service``, ``device`` and ``classic`` are explicitly selectable.

0.31.0b1 (Preview)
+++++++++++++++

Expand Down
6 changes: 6 additions & 0 deletions azext_iot/_help.py
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,12 @@
text: >
az iot hub generate-sas-token --connection-string
'HostName=myhub.azure-devices.net;DeviceId=mydevice;ModuleId=mymodule;SharedAccessKeyName=iothubowner;SharedAccessKey=12345'
- name: Generate a device SAS token whose audience targets the device endpoint.
text: >
az iot hub generate-sas-token -d {device_id} -n {iothub_name} --hostname-type device
- name: Generate an IoT Hub SAS token whose audience explicitly targets the service endpoint.
text: >
az iot hub generate-sas-token -n {iothub_name} --hostname-type service
"""

helps[
Expand Down
17 changes: 17 additions & 0 deletions azext_iot/_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,23 @@ def load_arguments(self, _):
"'service' uses the TLS 1.3 service hostname (errors if not GWv2).",
)

with self.argument_context("iot hub generate-sas-token") as context:
context.argument(
"hostname_type",
options_list=["--hostname-type", "--ht"],
arg_type=get_enum_type(HostnameType),
default=HostnameType.AUTO.value,
help="Type of hostname to use as the SAS token audience. "
"'auto' uses the device hostname for device/module-scoped tokens "
"and the service hostname for hub-scoped tokens. "
"'classic' always uses the default hostname. "
"'device' uses the device hostname (errors on non-GWv2 hubs). "
"'service' uses the service hostname (errors on non-GWv2 hubs). "
"This option cannot be combined with --connection-string; when "
"--connection-string is supplied, the SAS audience is derived "
"from its HostName.",
)

with self.argument_context("iot hub job") as context:
context.argument("job_id", options_list=["--job-id"], help="IoT Hub job Id.")
context.argument(
Expand Down
2 changes: 1 addition & 1 deletion azext_iot/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import os

VERSION = "0.31.0b1"
VERSION = "0.31.0b2"
EXTENSION_NAME = "azure-iot"
EXTENSION_ROOT = os.path.dirname(os.path.abspath(__file__))
EXTENSION_CONFIG_ROOT_KEY = "iotext"
Expand Down
85 changes: 70 additions & 15 deletions azext_iot/operations/hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -2240,22 +2240,20 @@ def iot_get_sas_token(
module_id=None,
auth_type_dataplane=None,
connection_string=None,
hostname_type=HostnameType.AUTO.value,
):
key_type = key_type.lower()
policy_name = policy_name.lower()

if login and policy_name != "iothubowner":
raise ArgumentUsageError(
"You are unable to change the sas policy with a hub connection string login."
)
if login and key_type != "primary" and not device_id:
raise ArgumentUsageError(
"For non-device sas, you are unable to change the key type with a connection string login."
)
if module_id and not device_id:
raise ArgumentUsageError(
"You are unable to get sas token for module without device information."
)
_validate_iot_get_sas_token_args(
login=login,
policy_name=policy_name,
key_type=key_type,
device_id=device_id,
module_id=module_id,
connection_string=connection_string,
hostname_type=hostname_type,
)

if connection_string:
Comment thread
cheatsheet1999 marked this conversation as resolved.
return {
Expand All @@ -2277,10 +2275,33 @@ def iot_get_sas_token(
resource_group_name,
login,
auth_type_dataplane,
hostname_type,
).generate_sas_token()
}


def _validate_iot_get_sas_token_args(
login, policy_name, key_type, device_id, module_id, connection_string, hostname_type
):
if login and policy_name != "iothubowner":
raise ArgumentUsageError(
"You are unable to change the sas policy with a hub connection string login."
)
if login and key_type != "primary" and not device_id:
raise ArgumentUsageError(
"For non-device sas, you are unable to change the key type with a connection string login."
)
if module_id and not device_id:
raise ArgumentUsageError(
"You are unable to get sas token for module without device information."
)
if connection_string and hostname_type != HostnameType.AUTO.value:
raise ArgumentUsageError(
"--hostname-type is not supported with --connection-string. "
"The SAS audience is derived from the HostName in the supplied connection string."
)


def _iot_build_sas_token_from_cs(connection_string, duration=3600):
uri = None
policy = None
Expand Down Expand Up @@ -2330,6 +2351,7 @@ def _iot_build_sas_token(
resource_group_name=None,
login=None,
auth_type_dataplane=None,
hostname_type=HostnameType.AUTO.value,
):
from azext_iot.common._azure import (
parse_iot_device_connection_string,
Expand All @@ -2352,6 +2374,10 @@ def _iot_build_sas_token(
policy = None
key = None

resolved_host = _resolve_sas_audience(
target, hostname_type, device_id=device_id, login=login
)

if device_id:
logger.info(
'Obtaining device "%s" details from registry, using IoT Hub policy "%s"',
Expand All @@ -2365,7 +2391,7 @@ def _iot_build_sas_token(
entity=module, key_type=key_type
)
uri = "{}/devices/{}/modules/{}".format(
target["entity"], device_id, module_id
resolved_host, device_id, module_id
)
try:
parsed_module_cs = parse_iot_device_module_connection_string(module_cs)
Expand All @@ -2378,7 +2404,7 @@ def _iot_build_sas_token(
device_cs = _build_device_or_module_connection_string(
entity=device, key_type=key_type
)
uri = "{}/devices/{}".format(target["entity"], device_id)
uri = "{}/devices/{}".format(resolved_host, device_id)
try:
parsed_device_cs = parse_iot_device_connection_string(device_cs)
except ValueError as e:
Expand All @@ -2387,7 +2413,7 @@ def _iot_build_sas_token(

key = parsed_device_cs["SharedAccessKey"]
else:
uri = target["entity"]
uri = resolved_host
policy = target["policy"]
key = target["primarykey"] if key_type == "primary" else target["secondarykey"]

Expand All @@ -2410,6 +2436,25 @@ def _transform_hostname(hostname, hostname_type):
return hostname_map.get(hostname_type, hostname)


def _resolve_sas_audience(target, hostname_type, device_id=None, login=None):
"""Resolve the SAS audience host.

Login mode lacks ARM metadata, so the host is string-transformed from the
CS HostName.
"""
auto_tls_key = "deviceHostName" if device_id else "serviceHostName"
if login:
effective_type = hostname_type
if effective_type == HostnameType.AUTO.value:
effective_type = (
HostnameType.DEVICE.value if device_id else HostnameType.SERVICE.value
)
return _transform_hostname(target["entity"], effective_type)
return _resolve_hostname_by_type(
target, hostname_type, auto_tls_key=auto_tls_key
)


def _resolve_hostname_by_type(target, hostname_type, auto_tls_key="deviceHostName"):
classic = _transform_hostname(target["entity"], HostnameType.CLASSIC.value)
if hostname_type == HostnameType.CLASSIC.value:
Expand Down Expand Up @@ -2468,6 +2513,11 @@ def iot_get_device_connection_string(
auth_type_dataplane=None,
hostname_type=HostnameType.AUTO.value,
):
if hostname_type == HostnameType.SERVICE.value:
raise InvalidArgumentValueError(
"Hostname type 'service' is not supported for device connection strings. "
"Use 'auto', 'device', or 'classic' instead."
)
result = {}
discovery = IotHubDiscovery(cmd)
target = discovery.get_target(
Expand Down Expand Up @@ -2498,6 +2548,11 @@ def iot_get_module_connection_string(
auth_type_dataplane=None,
hostname_type=HostnameType.AUTO.value,
):
if hostname_type == HostnameType.SERVICE.value:
raise InvalidArgumentValueError(
"Hostname type 'service' is not supported for module connection strings. "
"Use 'auto', 'device', or 'classic' instead."
)
result = {}
discovery = IotHubDiscovery(cmd)
target = discovery.get_target(
Expand Down
64 changes: 64 additions & 0 deletions azext_iot/tests/iothub/core/test_iothub_utilities_int.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,70 @@ def test_iothub_generate_sas_token(self):
checks=[self.exists("sas")],
)

def test_iothub_generate_sas_token_hostname_type(self):
"""--hostname-type permutations for hub-level SAS.

Verifies the `sr=` audience in the generated SAS token matches the requested hostname type.
"""
from urllib.parse import unquote

def extract_sr(sas_payload):
sas = sas_payload["sas"].replace("SharedAccessSignature ", "")
for part in sas.split("&"):
if part.startswith("sr="):
return unquote(part[3:])
raise AssertionError(f"sas token had no sr= component: {sas}")

hub = self.cmd(
f"iot hub show -n {self.entity_name} -g {self.entity_rg}"
).get_output_in_json()
props = hub["properties"]
classic_hn = props["hostName"]
device_hn = props.get("deviceHostName")
service_hn = props.get("serviceHostName")
is_gwv2 = bool(device_hn and service_hn)

# auto: defaults to service hostname on GWv2, classic on V1
token = self.cmd(
f"iot hub generate-sas-token -n {self.entity_name} -g {self.entity_rg}",
checks=[self.exists("sas")],
).get_output_in_json()
expected_auto = service_hn if is_gwv2 else classic_hn
assert extract_sr(token) == expected_auto, \
f"auto: expected sr={expected_auto}, got {extract_sr(token)}"

# classic
token = self.cmd(
f"iot hub generate-sas-token -n {self.entity_name} -g {self.entity_rg} --hostname-type classic",
checks=[self.exists("sas")],
).get_output_in_json()
assert extract_sr(token) == classic_hn

if is_gwv2:
# service
token = self.cmd(
f"iot hub generate-sas-token -n {self.entity_name} -g {self.entity_rg} --hostname-type service",
checks=[self.exists("sas")],
).get_output_in_json()
assert extract_sr(token) == service_hn

# device
token = self.cmd(
f"iot hub generate-sas-token -n {self.entity_name} -g {self.entity_rg} --hostname-type device",
checks=[self.exists("sas")],
).get_output_in_json()
assert extract_sr(token) == device_hn
else:
# service / device must error on classic hubs
self.cmd(
f"iot hub generate-sas-token -n {self.entity_name} -g {self.entity_rg} --hostname-type service",
expect_failure=True,
)
self.cmd(
f"iot hub generate-sas-token -n {self.entity_name} -g {self.entity_rg} --hostname-type device",
expect_failure=True,
)

def test_iothub_connection_string_show(self):
conn_str_pattern = r"^HostName={0}(\.\w+)?\.azure-devices\.net;SharedAccessKeyName=iothubowner;SharedAccessKey=".format(
self.entity_name
Expand Down
Loading
Loading