diff --git a/server/src/uds/core/managers/crypto/certs.py b/server/src/uds/core/managers/crypto/certs.py index 2c6ea0b3d..26171ee12 100644 --- a/server/src/uds/core/managers/crypto/certs.py +++ b/server/src/uds/core/managers/crypto/certs.py @@ -39,7 +39,6 @@ from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.serialization import pkcs7 -from cryptography.x509.oid import ExtendedKeyUsageOID from django.conf import settings @@ -115,16 +114,6 @@ def load_system_roots() -> list[x509.Certificate]: return _system_trust_cache -def _check_leaf_code_signing(leaf: x509.Certificate) -> None: - # mstsc won't accept the .rdp signature without codeSigning EKU on the leaf - try: - eku = leaf.extensions.get_extension_for_class(x509.ExtendedKeyUsage).value - except x509.ExtensionNotFound: - raise ValueError('Leaf missing Extended Key Usage extension (codeSigning required)') - if ExtendedKeyUsageOID.CODE_SIGNING not in eku: - raise ValueError('Leaf missing codeSigning EKU required for RDP signing') - - def _verify_issued_by(cert: x509.Certificate, issuer: x509.Certificate, label: str) -> None: try: cert.verify_directly_issued_by(issuer) @@ -135,7 +124,8 @@ def _verify_issued_by(cert: x509.Certificate, issuer: x509.Certificate, label: s ) from e -def _walk_chain(leaf: x509.Certificate, chain: list[x509.Certificate]) -> None: +def check_chain(leaf: x509.Certificate, chain: list[x509.Certificate]) -> None: + # in-memory variant — used by the PFX path where there's no on-disk chain file now = datetime.datetime.now(datetime.timezone.utc) for c in (leaf, *chain): if not (c.not_valid_before_utc <= now <= c.not_valid_after_utc): @@ -180,5 +170,4 @@ def check_cert_chain(cert_chain: pathlib.Path | str) -> None: certs = load_pem_certificates(cert_chain) if not certs: raise ValueError('No certificates found in certificate chain') - _check_leaf_code_signing(certs[0]) - _walk_chain(certs[0], certs[1:]) + check_chain(certs[0], certs[1:]) diff --git a/server/src/uds/core/managers/crypto/rdp.py b/server/src/uds/core/managers/crypto/rdp.py index cec6c6b74..9f1241500 100644 --- a/server/src/uds/core/managers/crypto/rdp.py +++ b/server/src/uds/core/managers/crypto/rdp.py @@ -115,7 +115,8 @@ def _check_pubkey_matches_key(cert: x509.Certificate, key: _PrivateKey) -> None: def _load_cert_key_chain() -> tuple[x509.Certificate, _PrivateKey, list[x509.Certificate]]: cert_path = _certs.get_server_cert() - cert_data = open(cert_path, 'rb').read() + with open(cert_path, 'rb') as f: + cert_data = f.read() # try PFX first, it carries key+chain in one file try: @@ -126,15 +127,17 @@ def _load_cert_key_chain() -> tuple[x509.Certificate, _PrivateKey, list[x509.Cer if p12_cert is not None and p12_key is not None: key = _ensure_signer_key(p12_key) - _certs.check_cert_chain(cert_path) + chain = list(p12_chain or []) + _certs.check_chain(p12_cert, chain) _check_pubkey_matches_key(p12_cert, key) - return p12_cert, key, list(p12_chain or []) + return p12_cert, key, chain certs = _certs.load_certificates_any_format(cert_data) if not certs: raise ValueError(f'No certificates found in {cert_path}') - key_data = open(_certs.get_server_key(), 'rb').read() + with open(_certs.get_server_key(), 'rb') as f: + key_data = f.read() key = _ensure_signer_key(_certs.load_private_key_any_format(key_data)) _certs.check_cert_chain(cert_path) diff --git a/server/tests/core/managers/__init__.py b/server/tests/core/managers/__init__.py index c7faa63a8..62941f29c 100644 --- a/server/tests/core/managers/__init__.py +++ b/server/tests/core/managers/__init__.py @@ -29,5 +29,7 @@ Author: Adolfo Gómez, dkmaster at dkmon dot com """ # pyright: reportUnusedImport=false +from . import test_certs from . import test_crypto from . import test_downloads +from . import test_rdp diff --git a/server/tests/core/managers/_cert_factory.py b/server/tests/core/managers/_cert_factory.py new file mode 100644 index 000000000..865d47025 --- /dev/null +++ b/server/tests/core/managers/_cert_factory.py @@ -0,0 +1,181 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2026 Virtual Cable S.L. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# * Neither the name of Virtual Cable S.L. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +""" +Author: Adolfo Gómez, dkmaster at dkmon dot com +""" +# Cert/key helpers for crypto manager tests. Underscore prefix keeps pytest from collecting it. +import datetime +import pathlib +import secrets +import tempfile +import typing + +from cryptography import x509 +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import ec, rsa +from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey +from cryptography.x509.oid import NameOID + +from django.test import override_settings + +from uds.core.managers.crypto import certs as _certs + +from ...utils.test import UDSTestCase + +_PrivateKey = typing.Union[RSAPrivateKey, ec.EllipticCurvePrivateKey] + +# 1024 here only because keygen dominates test runtime; not for production +_TEST_RSA_BITS = 1024 + + +class CertTestCase(UDSTestCase): + _tmpdir: tempfile.TemporaryDirectory[str] + tmp: pathlib.Path + + def setUp(self) -> None: + super().setUp() + _certs._system_trust_cache = None # type: ignore[attr-defined] + self._tmpdir = tempfile.TemporaryDirectory() + self.tmp = pathlib.Path(self._tmpdir.name) + + def tearDown(self) -> None: + _certs._system_trust_cache = None # type: ignore[attr-defined] + self._tmpdir.cleanup() + super().tearDown() + + def write(self, name: str, data: bytes) -> pathlib.Path: + p = self.tmp / name + p.write_bytes(data) + return p + + def install_trust(self, *roots: x509.Certificate) -> None: + bundle = self.write('trust.pem', chain_to_pem(*roots)) + ov = override_settings(RDP_SIGN_CA_BUNDLE=str(bundle)) + ov.enable() + self.addCleanup(ov.disable) + + +def _name(cn: str) -> x509.Name: + return x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, cn)]) + + +def make_rsa_key(size: int = _TEST_RSA_BITS) -> RSAPrivateKey: + return rsa.generate_private_key(public_exponent=65537, key_size=size) + + + +def build_cert( + subject_name: x509.Name, + subject_key: _PrivateKey, + issuer_name: x509.Name, + issuer_key: _PrivateKey, + *, + is_ca: bool = False, + not_before: typing.Optional[datetime.datetime] = None, + not_after: typing.Optional[datetime.datetime] = None, +) -> x509.Certificate: + now = datetime.datetime.now(datetime.timezone.utc) + nb = not_before or (now - datetime.timedelta(days=1)) + na = not_after or (now + datetime.timedelta(days=365)) + builder = ( + x509.CertificateBuilder() + .subject_name(subject_name) + .issuer_name(issuer_name) + .public_key(subject_key.public_key()) + .serial_number(secrets.randbits(63)) + .not_valid_before(nb) + .not_valid_after(na) + ) + if is_ca: + builder = builder.add_extension(x509.BasicConstraints(ca=True, path_length=None), critical=True) + return builder.sign(issuer_key, hashes.SHA256()) + + +def self_signed( + cn: str, + *, + key: typing.Optional[_PrivateKey] = None, + is_ca: bool = True, + not_before: typing.Optional[datetime.datetime] = None, + not_after: typing.Optional[datetime.datetime] = None, +) -> tuple[x509.Certificate, _PrivateKey]: + key = key or make_rsa_key() + name = _name(cn) + cert = build_cert(name, key, name, key, is_ca=is_ca, not_before=not_before, not_after=not_after) + return cert, key + + +def issue( + cn: str, + issuer_cert: x509.Certificate, + issuer_key: _PrivateKey, + *, + key: typing.Optional[_PrivateKey] = None, + is_ca: bool = False, + not_before: typing.Optional[datetime.datetime] = None, + not_after: typing.Optional[datetime.datetime] = None, +) -> tuple[x509.Certificate, _PrivateKey]: + key = key or make_rsa_key() + cert = build_cert( + _name(cn), + key, + issuer_cert.subject, + issuer_key, + is_ca=is_ca, + not_before=not_before, + not_after=not_after, + ) + return cert, key + + +def to_pem(cert: x509.Certificate) -> bytes: + return cert.public_bytes(serialization.Encoding.PEM) + + +def to_der(cert: x509.Certificate) -> bytes: + return cert.public_bytes(serialization.Encoding.DER) + + +def key_to_pem(key: _PrivateKey) -> bytes: + return key.private_bytes( + serialization.Encoding.PEM, + serialization.PrivateFormat.PKCS8, + serialization.NoEncryption(), + ) + + +def key_to_der(key: _PrivateKey) -> bytes: + return key.private_bytes( + serialization.Encoding.DER, + serialization.PrivateFormat.PKCS8, + serialization.NoEncryption(), + ) + + +def chain_to_pem(*certs: x509.Certificate) -> bytes: + return b''.join(to_pem(c) for c in certs) diff --git a/server/tests/core/managers/test_certs.py b/server/tests/core/managers/test_certs.py new file mode 100644 index 000000000..7d5243f6c --- /dev/null +++ b/server/tests/core/managers/test_certs.py @@ -0,0 +1,216 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2026 Virtual Cable S.L. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# * Neither the name of Virtual Cable S.L. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +""" +Author: Adolfo Gómez, dkmaster at dkmon dot com +""" +import datetime + +from cryptography import x509 +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.serialization import pkcs7 + +from django.test import override_settings + +from uds.core.managers.crypto import certs + +from . import _cert_factory as cf + + +class CertsTest(cf.CertTestCase): + # ------------------------------------------------------------------ paths + @override_settings(RDP_SIGN_CERT='/custom/cert.pem', RDP_SIGN_KEY='/custom/key.pem') + def test_get_server_cert_override(self) -> None: + self.assertEqual(certs.get_server_cert(), '/custom/cert.pem') + self.assertEqual(certs.get_server_key(), '/custom/key.pem') + + # ------------------------------------------------------------------ loaders + def test_load_certificates_all_formats(self) -> None: + root, _ = cf.self_signed('ROOT') + leaf, _ = cf.self_signed('LEAF', is_ca=False) + + cases: list[tuple[str, bytes, set[object]]] = [ + ('PEM chain', cf.chain_to_pem(leaf, root), {leaf.subject, root.subject}), + ('DER single', cf.to_der(leaf), {leaf.subject}), + ('PKCS7 PEM', pkcs7.serialize_certificates([leaf, root], serialization.Encoding.PEM), {leaf.subject, root.subject}), + ('PKCS7 DER', pkcs7.serialize_certificates([leaf, root], serialization.Encoding.DER), {leaf.subject, root.subject}), + ] + for label, blob, expected in cases: + with self.subTest(format=label): + loaded = certs.load_certificates_any_format(blob) + self.assertEqual({c.subject for c in loaded}, expected) + + def test_load_certificates_invalid_raises(self) -> None: + with self.assertRaises(ValueError): + certs.load_certificates_any_format(b'not a cert') + + def test_load_private_key_pem_and_der(self) -> None: + _, key = cf.self_signed('K') + for label, blob in (('PEM', cf.key_to_pem(key)), ('DER', cf.key_to_der(key))): + with self.subTest(format=label): + loaded = certs.load_private_key_any_format(blob) + self.assertEqual( + loaded.public_key().public_numbers(), key.public_key().public_numbers() + ) + + def test_load_private_key_invalid_raises(self) -> None: + with self.assertRaises(ValueError): + certs.load_private_key_any_format(b'not a key') + + def test_load_pem_certificates_reads_file(self) -> None: + cert, _ = cf.self_signed('FILE') + path = self.write('c.pem', cf.to_pem(cert)) + for label, arg in (('str', str(path)), ('Path', path)): + with self.subTest(arg_type=label): + loaded = certs.load_pem_certificates(arg) + self.assertEqual(loaded[0].subject, cert.subject) + + # ------------------------------------------------------------------ system roots + def test_load_system_roots_uses_override(self) -> None: + root, _ = cf.self_signed('OVERRIDE-ROOT') + bundle = self.write('ca.pem', cf.to_pem(root)) + with override_settings(RDP_SIGN_CA_BUNDLE=str(bundle)): + loaded = certs.load_system_roots() + self.assertEqual([c.subject for c in loaded], [root.subject]) + + def test_load_system_roots_caches_result(self) -> None: + root, _ = cf.self_signed('CACHED-ROOT') + bundle = self.write('ca.pem', cf.to_pem(root)) + with override_settings(RDP_SIGN_CA_BUNDLE=str(bundle)): + first = certs.load_system_roots() + bundle.unlink() + second = certs.load_system_roots() + self.assertIs(first, second) + + def test_load_system_roots_missing_bundle_returns_empty(self) -> None: + with override_settings(RDP_SIGN_CA_BUNDLE=str(self.tmp / 'does-not-exist.pem')): + loaded = certs.load_system_roots() + self.assertEqual(loaded, []) + + # ------------------------------------------------------------------ chain validation + def test_check_cert_chain_self_signed_leaf(self) -> None: + leaf_cert, _ = cf.self_signed('selfsigned-leaf', is_ca=False) + path = self.write('leaf.pem', cf.to_pem(leaf_cert)) + with override_settings(RDP_SIGN_CA_BUNDLE=str(self.tmp / 'missing.pem')): + certs.check_cert_chain(path) # no raise + + def test_check_cert_chain_valid_with_intermediate(self) -> None: + root_cert, root_key = cf.self_signed('ROOT') + inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) + leaf_cert, _ = cf.issue('LEAF', inter_cert, inter_key) + self.install_trust(root_cert) + path = self.write('chain.pem', cf.chain_to_pem(leaf_cert, inter_cert)) + certs.check_cert_chain(path) + + def test_check_cert_chain_root_bundled_anchors(self) -> None: + root_cert, root_key = cf.self_signed('ROOT') + inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) + leaf_cert, _ = cf.issue('LEAF', inter_cert, inter_key) + with override_settings(RDP_SIGN_CA_BUNDLE=str(self.tmp / 'missing.pem')): + path = self.write('full.pem', cf.chain_to_pem(leaf_cert, inter_cert, root_cert)) + certs.check_cert_chain(path) + + def test_check_cert_chain_incomplete_raises(self) -> None: + root_cert, root_key = cf.self_signed('ROOT') + inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) + leaf_cert, _ = cf.issue('LEAF', inter_cert, inter_key) + with override_settings(RDP_SIGN_CA_BUNDLE=str(self.tmp / 'missing.pem')): + path = self.write('leaf-only.pem', cf.to_pem(leaf_cert)) + with self.assertRaises(ValueError) as ctx: + certs.check_cert_chain(path) + self.assertIn('Incomplete chain', str(ctx.exception)) + + def test_check_cert_chain_expired_raises(self) -> None: + now = datetime.datetime.now(datetime.timezone.utc) + expired_cert, _ = cf.self_signed( + 'EXPIRED', + not_before=now - datetime.timedelta(days=30), + not_after=now - datetime.timedelta(days=1), + ) + path = self.write('exp.pem', cf.to_pem(expired_cert)) + with self.assertRaises(ValueError) as ctx: + certs.check_cert_chain(path) + self.assertIn('expired or not yet valid', str(ctx.exception)) + + def test_check_cert_chain_empty_file_raises(self) -> None: + path = self.write('empty.pem', b'') + with self.assertRaises(ValueError): + certs.check_cert_chain(path) + + def test_check_chain_in_memory_ok(self) -> None: + root_cert, root_key = cf.self_signed('ROOT') + inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) + leaf_cert, _ = cf.issue('LEAF', inter_cert, inter_key) + self.install_trust(root_cert) + certs.check_chain(leaf_cert, [inter_cert]) + + def test_check_chain_in_memory_incomplete_raises(self) -> None: + root_cert, root_key = cf.self_signed('ROOT') + inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) + leaf_cert, _ = cf.issue('LEAF', inter_cert, inter_key) + with override_settings(RDP_SIGN_CA_BUNDLE=str(self.tmp / 'missing.pem')): + with self.assertRaises(ValueError): + certs.check_chain(leaf_cert, []) + + def test_check_chain_not_yet_valid_raises(self) -> None: + now = datetime.datetime.now(datetime.timezone.utc) + future_cert, _ = cf.self_signed( + 'FUTURE', + not_before=now + datetime.timedelta(days=1), + not_after=now + datetime.timedelta(days=365), + ) + path = self.write('fut.pem', cf.to_pem(future_cert)) + with self.assertRaises(ValueError) as ctx: + certs.check_cert_chain(path) + self.assertIn('expired or not yet valid', str(ctx.exception)) + + def test_check_chain_broken_signature_raises(self) -> None: + # intermediate names ROOT as issuer but was signed by some other key + root_cert, _ = cf.self_signed('ROOT') + rogue_key = cf.make_rsa_key() + inter_cert, inter_key = cf.issue('INTER', root_cert, rogue_key, is_ca=True) + leaf_cert, _ = cf.issue('LEAF', inter_cert, inter_key) + self.install_trust(root_cert) + with self.assertRaises(ValueError) as ctx: + certs.check_chain(leaf_cert, [inter_cert]) + self.assertIn('signature invalid', str(ctx.exception)) + + def test_check_chain_depth_exceeded_raises(self) -> None: + # chain longer than _MAX_CHAIN_DEPTH with no anchor in trust + root_cert, root_key = cf.self_signed('R') + prev_cert, prev_key = root_cert, root_key + intermediates: list[x509.Certificate] = [] + for i in range(certs._MAX_CHAIN_DEPTH + 2): + c, k = cf.issue(f'I{i}', prev_cert, prev_key, is_ca=True) + intermediates.append(c) + prev_cert, prev_key = c, k + leaf, _ = cf.issue('LEAF', prev_cert, prev_key) + with override_settings(RDP_SIGN_CA_BUNDLE=str(self.tmp / 'missing.pem')): + with self.assertRaises(ValueError) as ctx: + certs.check_chain(leaf, intermediates) + self.assertIn('Chain depth exceeded', str(ctx.exception)) diff --git a/server/tests/core/managers/test_rdp.py b/server/tests/core/managers/test_rdp.py new file mode 100644 index 000000000..6ed1e59ae --- /dev/null +++ b/server/tests/core/managers/test_rdp.py @@ -0,0 +1,242 @@ +# -*- coding: utf-8 -*- +# +# Copyright (c) 2026 Virtual Cable S.L. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without modification, +# are permitted provided that the following conditions are met: +# +# * Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# * Neither the name of Virtual Cable S.L. nor the names of its contributors +# may be used to endorse or promote products derived from this software +# without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +""" +Author: Adolfo Gómez, dkmaster at dkmon dot com +""" +import base64 +import struct + +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import dsa, ec, ed25519 +from cryptography.hazmat.primitives.serialization import pkcs12 + +from django.test import override_settings + +from uds.core.managers.crypto import rdp + +from . import _cert_factory as cf + + +class RdpTest(cf.CertTestCase): + # ------------------------------------------------------------------ signer key guard + def test_ensure_signer_key_rejects_unsupported(self) -> None: + ed_key = ed25519.Ed25519PrivateKey.generate() + dsa_key = dsa.generate_private_key(key_size=2048) + for label, k in (('ed25519', ed_key), ('dsa', dsa_key)): + with self.subTest(kind=label): + with self.assertRaises(ValueError) as ctx: + rdp._ensure_signer_key(k) + self.assertIn('Unsupported private key type', str(ctx.exception)) + + def test_ensure_signer_key_accepts_rsa_and_ec(self) -> None: + rsa_key = cf.make_rsa_key() + ec_key = ec.generate_private_key(ec.SECP256R1()) + for label, k in (('rsa', rsa_key), ('ec', ec_key)): + with self.subTest(kind=label): + self.assertIs(rdp._ensure_signer_key(k), k) + + # ------------------------------------------------------------------ pubkey match + def test_pubkey_match_ok(self) -> None: + cert, key = cf.self_signed('MATCH', is_ca=False) + rdp._check_pubkey_matches_key(cert, key) + + def test_pubkey_match_mismatch_raises(self) -> None: + cert, _ = cf.self_signed('MATCH', is_ca=False) + other_key = cf.make_rsa_key() + with self.assertRaises(ValueError) as ctx: + rdp._check_pubkey_matches_key(cert, other_key) + self.assertIn('does not match', str(ctx.exception)) + + # ------------------------------------------------------------------ sign_rdp_settings + def test_sign_rdp_settings_preserves_declared_order(self) -> None: + cert, key = cf.self_signed('SIGNER', is_ca=False) + lines = [ + 'audiomode:i:0', + 'full address:s:host.example.com', + 'server port:i:3389', + ] + sig_b64, signnames = rdp.sign_rdp_settings(lines, cert=cert, key=key, chain=[]) + self.assertEqual(signnames, ['Full Address', 'Server Port', 'AudioMode']) + + raw = base64.b64decode(sig_b64) + self.assertGreater(len(raw), 12) + magic, one, siglen = struct.unpack(' None: + cert, key = cf.self_signed('SIGNER', is_ca=False) + lines = ['unknown:s:whatever', 'full address:s:host'] + _, signnames = rdp.sign_rdp_settings(lines, cert=cert, key=key, chain=[]) + self.assertEqual(signnames, ['Full Address']) + + # ------------------------------------------------------------------ sign_rdp + def test_sign_rdp_appends_signscope_and_signature(self) -> None: + cert, key = cf.self_signed('SIGNER', is_ca=False) + rdp_text = 'full address:s:host.example.com\r\nserver port:i:3389\r\n' + signed = rdp.sign_rdp(rdp_text, cert=cert, key=key, chain=[]) + out_lines = signed.splitlines() + signscope = next(l for l in out_lines if l.startswith('signscope:s:')) + signature = next(l for l in out_lines if l.startswith('signature:s:')) + self.assertIn('Full Address', signscope) + self.assertIn('Server Port', signscope) + self.assertTrue(signature.startswith('signature:s:')) + base64.b64decode(signature[len('signature:s:'):]) + + def test_sign_rdp_strips_previous_signature_lines(self) -> None: + cert, key = cf.self_signed('SIGNER', is_ca=False) + rdp_text = ( + 'full address:s:host\r\n' + 'signscope:s:Full Address\r\n' + 'signature:s:OLDBOGUS==\r\n' + ) + signed = rdp.sign_rdp(rdp_text, cert=cert, key=key, chain=[]) + self.assertEqual(signed.count('signature:s:'), 1) + self.assertEqual(signed.count('signscope:s:'), 1) + self.assertNotIn('OLDBOGUS', signed) + + def test_sign_rdp_mirrors_full_address_to_alternate(self) -> None: + cert, key = cf.self_signed('SIGNER', is_ca=False) + rdp_text = 'full address:s:host.example.com\r\n' + signed = rdp.sign_rdp(rdp_text, cert=cert, key=key, chain=[]) + self.assertIn('alternate full address:s:host.example.com', signed) + signscope = next(l for l in signed.splitlines() if l.startswith('signscope:s:')) + self.assertIn('Alternate Full Address', signscope) + + def test_sign_rdp_keeps_existing_alternate(self) -> None: + cert, key = cf.self_signed('SIGNER', is_ca=False) + rdp_text = ( + 'full address:s:primary.example.com\r\n' + 'alternate full address:s:backup.example.com\r\n' + ) + signed = rdp.sign_rdp(rdp_text, cert=cert, key=key, chain=[]) + self.assertIn('alternate full address:s:backup.example.com', signed) + self.assertNotIn('alternate full address:s:primary.example.com', signed) + + # ------------------------------------------------------------------ _load_cert_key_chain + def test_load_cert_key_chain_pem_pair(self) -> None: + root_cert, root_key = cf.self_signed('ROOT') + inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) + leaf_cert, leaf_key = cf.issue('LEAF', inter_cert, inter_key) + self.install_trust(root_cert) + + cert_path = self.write('cert.pem', cf.chain_to_pem(leaf_cert, inter_cert)) + key_path = self.write('key.pem', cf.key_to_pem(leaf_key)) + + with override_settings(RDP_SIGN_CERT=str(cert_path), RDP_SIGN_KEY=str(key_path)): + loaded_leaf, loaded_key, loaded_chain = rdp._load_cert_key_chain() + + self.assertEqual(loaded_leaf.subject, leaf_cert.subject) + self.assertEqual( + loaded_key.public_key().public_numbers(), leaf_key.public_key().public_numbers() + ) + self.assertEqual([c.subject for c in loaded_chain], [inter_cert.subject]) + + def test_load_cert_key_chain_pkcs12(self) -> None: + root_cert, root_key = cf.self_signed('ROOT') + inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) + leaf_cert, leaf_key = cf.issue('LEAF', inter_cert, inter_key) + self.install_trust(root_cert) + + pfx_bytes = pkcs12.serialize_key_and_certificates( + name=b'leaf', + key=leaf_key, + cert=leaf_cert, + cas=[inter_cert], + encryption_algorithm=serialization.NoEncryption(), + ) + pfx_path = self.write('bundle.pfx', pfx_bytes) + + with override_settings(RDP_SIGN_CERT=str(pfx_path), RDP_SIGN_KEY='/does/not/matter'): + loaded_leaf, loaded_key, loaded_chain = rdp._load_cert_key_chain() + + self.assertEqual(loaded_leaf.subject, leaf_cert.subject) + self.assertEqual( + loaded_key.public_key().public_numbers(), leaf_key.public_key().public_numbers() + ) + self.assertEqual([c.subject for c in loaded_chain], [inter_cert.subject]) + + def test_load_cert_key_chain_empty_pem_raises(self) -> None: + cert_path = self.write('empty.pem', b'') + key_path = self.write('key.pem', cf.key_to_pem(cf.make_rsa_key())) + with override_settings(RDP_SIGN_CERT=str(cert_path), RDP_SIGN_KEY=str(key_path)): + with self.assertRaises(ValueError): + rdp._load_cert_key_chain() + + def test_sign_rdp_settings_includes_chain_in_pkcs7(self) -> None: + # mstsc needs intermediates inside the SignedData blob to build a path + from cryptography.hazmat.primitives.serialization import pkcs7 as _pkcs7 + root_cert, root_key = cf.self_signed('ROOT') + inter_cert, inter_key = cf.issue('INTER', root_cert, root_key, is_ca=True) + leaf_cert, leaf_key = cf.issue('LEAF', inter_cert, inter_key) + + sig_b64, _ = rdp.sign_rdp_settings( + ['full address:s:host'], cert=leaf_cert, key=leaf_key, chain=[inter_cert] + ) + raw = base64.b64decode(sig_b64) + # skip rdpsign 12-byte header to get raw DER PKCS7 + embedded = _pkcs7.load_der_pkcs7_certificates(raw[12:]) + subjects = {c.subject for c in embedded} + self.assertIn(leaf_cert.subject, subjects) + self.assertIn(inter_cert.subject, subjects) + + def test_sign_rdp_with_ec_key(self) -> None: + ec_key = ec.generate_private_key(ec.SECP256R1()) + from cryptography import x509 + from cryptography.hazmat.primitives import hashes + from cryptography.x509.oid import NameOID + import datetime as _dt + now = _dt.datetime.now(_dt.timezone.utc) + name = x509.Name([x509.NameAttribute(NameOID.COMMON_NAME, 'EC-SIGNER')]) + cert = ( + x509.CertificateBuilder() + .subject_name(name).issuer_name(name) + .public_key(ec_key.public_key()) + .serial_number(1) + .not_valid_before(now - _dt.timedelta(days=1)) + .not_valid_after(now + _dt.timedelta(days=30)) + .sign(ec_key, hashes.SHA256()) + ) + signed = rdp.sign_rdp('full address:s:host\r\n', cert=cert, key=ec_key, chain=[]) + self.assertIn('signature:s:', signed) + + def test_load_cert_key_chain_key_mismatch_raises(self) -> None: + root_cert, root_key = cf.self_signed('ROOT') + leaf_cert, _ = cf.issue('LEAF', root_cert, root_key) + self.install_trust(root_cert) + + wrong_key = cf.make_rsa_key() + cert_path = self.write('cert.pem', cf.to_pem(leaf_cert)) + key_path = self.write('key.pem', cf.key_to_pem(wrong_key)) + + with override_settings(RDP_SIGN_CERT=str(cert_path), RDP_SIGN_KEY=str(key_path)): + with self.assertRaises(ValueError) as ctx: + rdp._load_cert_key_chain() + self.assertIn('does not match', str(ctx.exception))