Skip to content
17 changes: 3 additions & 14 deletions server/src/uds/core/managers/crypto/certs.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.serialization import pkcs7
from cryptography.x509.oid import ExtendedKeyUsageOID

from django.conf import settings

Expand Down Expand Up @@ -115,16 +114,6 @@ def load_system_roots() -> list[x509.Certificate]:
return _system_trust_cache


def _check_leaf_code_signing(leaf: x509.Certificate) -> None:
# mstsc won't accept the .rdp signature without codeSigning EKU on the leaf
try:
eku = leaf.extensions.get_extension_for_class(x509.ExtendedKeyUsage).value
except x509.ExtensionNotFound:
raise ValueError('Leaf missing Extended Key Usage extension (codeSigning required)')
if ExtendedKeyUsageOID.CODE_SIGNING not in eku:
raise ValueError('Leaf missing codeSigning EKU required for RDP signing')


def _verify_issued_by(cert: x509.Certificate, issuer: x509.Certificate, label: str) -> None:
try:
cert.verify_directly_issued_by(issuer)
Expand All @@ -135,7 +124,8 @@ def _verify_issued_by(cert: x509.Certificate, issuer: x509.Certificate, label: s
) from e


def _walk_chain(leaf: x509.Certificate, chain: list[x509.Certificate]) -> None:
def check_chain(leaf: x509.Certificate, chain: list[x509.Certificate]) -> None:
# in-memory variant — used by the PFX path where there's no on-disk chain file
now = datetime.datetime.now(datetime.timezone.utc)
for c in (leaf, *chain):
if not (c.not_valid_before_utc <= now <= c.not_valid_after_utc):
Expand Down Expand Up @@ -180,5 +170,4 @@ def check_cert_chain(cert_chain: pathlib.Path | str) -> None:
certs = load_pem_certificates(cert_chain)
if not certs:
raise ValueError('No certificates found in certificate chain')
_check_leaf_code_signing(certs[0])
_walk_chain(certs[0], certs[1:])
check_chain(certs[0], certs[1:])
11 changes: 7 additions & 4 deletions server/src/uds/core/managers/crypto/rdp.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ def _check_pubkey_matches_key(cert: x509.Certificate, key: _PrivateKey) -> None:

def _load_cert_key_chain() -> tuple[x509.Certificate, _PrivateKey, list[x509.Certificate]]:
cert_path = _certs.get_server_cert()
cert_data = open(cert_path, 'rb').read()
with open(cert_path, 'rb') as f:
cert_data = f.read()

# try PFX first, it carries key+chain in one file
try:
Expand All @@ -126,15 +127,17 @@ def _load_cert_key_chain() -> tuple[x509.Certificate, _PrivateKey, list[x509.Cer

if p12_cert is not None and p12_key is not None:
key = _ensure_signer_key(p12_key)
_certs.check_cert_chain(cert_path)
chain = list(p12_chain or [])
_certs.check_chain(p12_cert, chain)
_check_pubkey_matches_key(p12_cert, key)
return p12_cert, key, list(p12_chain or [])
return p12_cert, key, chain

certs = _certs.load_certificates_any_format(cert_data)
if not certs:
raise ValueError(f'No certificates found in {cert_path}')

key_data = open(_certs.get_server_key(), 'rb').read()
with open(_certs.get_server_key(), 'rb') as f:
key_data = f.read()
key = _ensure_signer_key(_certs.load_private_key_any_format(key_data))

_certs.check_cert_chain(cert_path)
Expand Down
2 changes: 2 additions & 0 deletions server/tests/core/managers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,7 @@
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
# pyright: reportUnusedImport=false
from . import test_certs
from . import test_crypto
from . import test_downloads
from . import test_rdp
181 changes: 181 additions & 0 deletions server/tests/core/managers/_cert_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
# -*- coding: utf-8 -*-
#
# Copyright (c) 2026 Virtual Cable S.L.
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
# * Neither the name of Virtual Cable S.L. nor the names of its contributors
# may be used to endorse or promote products derived from this software
# without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
"""
Author: Adolfo Gómez, dkmaster at dkmon dot com
"""
# Cert/key helpers for crypto manager tests. Underscore prefix keeps pytest from collecting it.
import datetime
import pathlib
import secrets
import tempfile
import typing

from cryptography import x509
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec, rsa
from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
from cryptography.x509.oid import NameOID

from django.test import override_settings

from uds.core.managers.crypto import certs as _certs

from ...utils.test import UDSTestCase

_PrivateKey = typing.Union[RSAPrivateKey, ec.EllipticCurvePrivateKey]

# 1024 here only because keygen dominates test runtime; not for production
_TEST_RSA_BITS = 1024


class CertTestCase(UDSTestCase):
_tmpdir: tempfile.TemporaryDirectory[str]
tmp: pathlib.Path

def setUp(self) -> None:
super().setUp()
_certs._system_trust_cache = None # type: ignore[attr-defined]
self._tmpdir = tempfile.TemporaryDirectory()
self.tmp = pathlib.Path(self._tmpdir.name)

def tearDown(self) -> None:
_certs._system_trust_cache = None # type: ignore[attr-defined]
self._tmpdir.cleanup()
super().tearDown()

def write(self, name: str, data: bytes) -> pathlib.Path:
p = self.tmp / name
p.write_bytes(data)
return p

def install_trust(self, *roots: x509.Certificate) -> None:
bundle = self.write('trust.pem', chain_to_pem(*roots))
ov = override_settings(RDP_SIGN_CA_BUNDLE=str(bundle))
ov.enable()
self.addCleanup(ov.disable)


def _name(cn: str) -> x509.Name:
return x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cn)])


def make_rsa_key(size: int = _TEST_RSA_BITS) -> RSAPrivateKey:
return rsa.generate_private_key(public_exponent=65537, key_size=size)



def build_cert(
subject_name: x509.Name,
subject_key: _PrivateKey,
issuer_name: x509.Name,
issuer_key: _PrivateKey,
*,
is_ca: bool = False,
not_before: typing.Optional[datetime.datetime] = None,
not_after: typing.Optional[datetime.datetime] = None,
) -> x509.Certificate:
now = datetime.datetime.now(datetime.timezone.utc)
nb = not_before or (now - datetime.timedelta(days=1))
na = not_after or (now + datetime.timedelta(days=365))
builder = (
x509.CertificateBuilder()
.subject_name(subject_name)
.issuer_name(issuer_name)
.public_key(subject_key.public_key())
.serial_number(secrets.randbits(63))
.not_valid_before(nb)
.not_valid_after(na)
)
if is_ca:
builder = builder.add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True)
return builder.sign(issuer_key, hashes.SHA256())


def self_signed(
cn: str,
*,
key: typing.Optional[_PrivateKey] = None,
is_ca: bool = True,
not_before: typing.Optional[datetime.datetime] = None,
not_after: typing.Optional[datetime.datetime] = None,
) -> tuple[x509.Certificate, _PrivateKey]:
key = key or make_rsa_key()
name = _name(cn)
cert = build_cert(name, key, name, key, is_ca=is_ca, not_before=not_before, not_after=not_after)
return cert, key


def issue(
cn: str,
issuer_cert: x509.Certificate,
issuer_key: _PrivateKey,
*,
key: typing.Optional[_PrivateKey] = None,
is_ca: bool = False,
not_before: typing.Optional[datetime.datetime] = None,
not_after: typing.Optional[datetime.datetime] = None,
) -> tuple[x509.Certificate, _PrivateKey]:
key = key or make_rsa_key()
cert = build_cert(
_name(cn),
key,
issuer_cert.subject,
issuer_key,
is_ca=is_ca,
not_before=not_before,
not_after=not_after,
)
return cert, key


def to_pem(cert: x509.Certificate) -> bytes:
return cert.public_bytes(serialization.Encoding.PEM)


def to_der(cert: x509.Certificate) -> bytes:
return cert.public_bytes(serialization.Encoding.DER)


def key_to_pem(key: _PrivateKey) -> bytes:
return key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.PKCS8,
serialization.NoEncryption(),
)


def key_to_der(key: _PrivateKey) -> bytes:
return key.private_bytes(
serialization.Encoding.DER,
serialization.PrivateFormat.PKCS8,
serialization.NoEncryption(),
)


def chain_to_pem(*certs: x509.Certificate) -> bytes:
return b''.join(to_pem(c) for c in certs)
Loading
Loading