Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions src/ssh/azext_ssh/_help.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,38 @@
az ssh cert --file ./id_rsa-aadcert.pub --ssh-client-folder "C:\\Program Files\\OpenSSH"
"""

helps['ssh cert-create'] = """
type: command
short-summary: Create a short-lived SSH certificate signed by a private CA key in Azure Key Vault.
long-summary: |
Generates an ephemeral SSH key pair, determines the caller's RBAC role
on the target ProvisionedMachine resource via PIM-based JIT access, and
sends the public key along with metadata (userPublicKey, username, role,
expiry) to Key Vault for signing.

The user's role is NOT taken as input — it is resolved automatically from
the RBAC role assignment on the device resource.

Currently the extension relies on the built-in Azure roles (Owner,
Contributor, Reader) because we do not yet have permission to create
custom roles. The final intended roles are:
- Provisioned Machine Administrator (full SSH with sudo)
- Provisioned Machine Contributor (SSH without sudo)
- Provisioned Machine Reader (view-only; SSH restricted on device)
Certificates are generated for all roles — access restrictions are
enforced on the device side, not by the CLI.
These custom roles are pending creation (Teodora, Eric — please help
finalize so the CLI extension can be completed).

The user identity is derived automatically from the Entra login context.
The certificate expiry is derived from the PIM activation's remaining duration.
Returns the signed SSH user certificate and the freshly generated private key.
examples:
- name: Create a certificate (expiry derived from PIM activation)
text: |
az ssh cert-create --vault-name myKeyVault --resource-id /subscriptions/.../providers/Microsoft.ProvisionedMachine/machines/myDevice
"""

helps['ssh arc'] = """
type: command
short-summary: SSH into Azure Arc Servers
Expand Down
10 changes: 10 additions & 0 deletions src/ssh/azext_ssh/_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ def load_arguments(self, _):
help='Folder path that contains ssh executables (ssh.exe, ssh-keygen.exe, etc). '
'Default to ssh pre-installed if not provided.')

with self.argument_context('ssh cert-create') as c:
c.argument('vault_name', options_list=['--vault-name', '-v'],
help='Name of the Azure Key Vault that holds the private CA signing key (ssh-ca).',
required=True)
c.argument('resource_id', options_list=['--resource-id', '-r'],
help='Fully qualified ARM resource ID of the ProvisionedMachine. '
'Used to determine the user\'s RBAC role (Reader/Contributor/Admin) '
'via PIM-based JIT access.',
required=True)

with self.argument_context('ssh arc') as c:
c.argument('vm_name', options_list=['--vm-name', '--name', '-n'], help='The name of the Arc Server')
c.argument('public_key_file', options_list=['--public-key-file', '-p'], help='The RSA public key file path')
Expand Down
1 change: 1 addition & 0 deletions src/ssh/azext_ssh/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ def load_command_table(self, _):
g.custom_command('config', 'ssh_config')
g.custom_command('cert', 'ssh_cert')
g.custom_command('arc', 'ssh_arc')
g.custom_command('cert-create', 'ssh_cert_create')
93 changes: 93 additions & 0 deletions src/ssh/azext_ssh/custom.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,99 @@ def ssh_arc(cmd, resource_group_name=None, vm_name=None, public_key_file=None, p
resource_type, ssh_proxy_folder, winrdp, yes_without_prompt, ssh_args)


def ssh_cert_create(cmd, vault_name, resource_id):
"""Create a short-lived SSH certificate signed by a private CA in Key Vault.

Step 1 - Prepare certificate metadata:
Generate ephemeral key pair, resolve PIM role, build metadata:
{ user_public_key, username, role, expiry }
Expiry is derived from the PIM activation's remaining duration.

Step 2 - Sign via Key Vault:
Send signing request to Key Vault Sign API using az login context.
CA private key never leaves Key Vault.

Step 3 - Return to user:
Signed SSH user certificate + freshly generated user_private key.
"""
from . import provisioned_machine_utils as pm

telemetry.set_command_details('ssh cert-create')

# Validate inputs.
pm.validate_resource_id(resource_id)
pm.validate_vault_name(vault_name)

private_key_path = None
cert_path = None
try:
# -- Step 1: Prepare certificate metadata --------------------------
# Derive username from az login (Entra) context.
username = pm.get_current_user_principal(cmd)
logger.info("Derived username: %s", username)

# Verify the user has an active PIM assignment (JIT activated).
# startTime/endTime are derived from the PIM activation window.
_pim_instances, start_time, end_time = pm.check_pim_eligibility(cmd, resource_id)
logger.info("PIM eligibility confirmed for resource: %s (valid %s to %s)",
resource_id, start_time, end_time)

# Resolve role from PIM assignment on the ProvisionedMachine resource.
# Reader role is blocked — only Contributor and Administrator can
# generate SSH certificates.
role = pm.resolve_user_role(cmd, resource_id)
logger.info("Resolved PIM role: %s", role)

# Determine which certificate types this role can generate.
role_perms = pm.ROLE_PERMISSIONS.get(role, {})
cert_types = role_perms.get("certificate_types", [])
logger.info("Allowed certificate types for %s: %s", role, cert_types)

# Generate fresh ephemeral SSH key pair.
private_key_path, public_key_path = pm.generate_ephemeral_keypair()
with open(public_key_path, "r", encoding="utf-8") as f:
user_public_key = f.read().strip()

certificate_metadata = {
"userPublicKey": user_public_key,
"username": username,
"role": role,
"startTime": start_time,
"endTime": end_time,
}

# -- Step 2: Sign via Key Vault ------------------------------------
# AZ CLI sends signing request using az login context.
# CA private key never leaves Key Vault.
signed_certificate = pm.sign_certificate_metadata(
cmd, vault_name, certificate_metadata
)
cert_path = signed_certificate["certificatePath"]

# -- Step 3: Return to user ----------------------------------------
result = {
"privateKeyPath": private_key_path,
"certificatePath": cert_path,
}

print_styled_text((Style.SUCCESS,
f"SSH certificate created successfully.\n"
f" Private key : {private_key_path}\n"
f" Certificate : {cert_path}"))

logger.warning("The private key at %s is sensitive. "
"Delete it once the certificate expires.",
os.path.dirname(private_key_path))

telemetry.set_success()
return result

except Exception:
# Clean up sensitive ephemeral files on failure.
pm.cleanup_ephemeral_files(private_key_path, cert_path)
raise


def _do_ssh_op(cmd, op_info, op_call):
# Get ssh_ip before getting public key to avoid getting "ResourceNotFound" exception after creating the keys
if not op_info.is_arc():
Expand Down
Loading