diff --git a/barbican/plugin/crypto/sap_kms_plugin.py b/barbican/plugin/crypto/sap_kms_plugin.py new file mode 100644 index 000000000..5553fa40f --- /dev/null +++ b/barbican/plugin/crypto/sap_kms_plugin.py @@ -0,0 +1,207 @@ +# Copyright (c) 2025 SAP SE +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import base64 +import json +import os +from oslo_log import log as logging +from cryptography.fernet import Fernet +from barbican.plugin.crypto import base as cbase + +LOG = logging.getLogger(__name__) +LOG.debug("Initializing SAPKMSCryptoPlugin module: %s", __name__) + + +class SAPKMSCryptoPlugin(cbase.CryptoPluginBase): + """Minimal Fernet-based CryptoPlugin for demo/dev. + + * encrypt()/decrypt(): uses a single Fernet key from env/config (per-process) + * generate_symmetric(): returns random key material (bytes) encrypted with Fernet + * bind_kek_metadata(): records a label per project (no real per-project KEK) + + WARNING: This is for local testing only. + """ + + def __init__(self): + super().__init__() + LOG.debug("SAPKMSCryptoPlugin instance created") + + # ---- Helper: get Fernet from env/config ---- + def _fernet(self, gtid_scoped_kek) -> Fernet: + LOG.debug("Preparing Fernet instance (gtid_scoped_kek provided=%s)", bool(gtid_scoped_kek)) + key = gtid_scoped_kek or os.environ.get("SAP_KMS_FERNET_KEY") + if not key: + # Generate ephemeral key for dev only; do not log the key material. + key = Fernet.generate_key().decode("utf-8") + os.environ["SAP_KMS_FERNET_KEY"] = key + LOG.warning("No SAP_KMS_FERNET_KEY configured; generated an ephemeral key (INSECURE FOR PRODUCTION)") + try: + f = Fernet(key.encode("utf-8")) + LOG.debug("Fernet instance initialized successfully") + return f + except Exception: + LOG.exception("Failed to initialize Fernet instance") + raise + + # --- Metadata helpers --- + def _load_sap_gtid_to_kek_mapping(self, sap_gtid: str) -> dict: + LOG.debug("Loading SAP GTID to KEK mapping for sap_gtid=%s", sap_gtid) + # 1) file mapping (optional) + path = os.environ.get("PROJECT_METADATA_FILE") + if path and os.path.exists(path): + try: + LOG.debug("Attempting to load PROJECT_METADATA_FILE from %s", path) + with open(path, "r") as fh: + data = json.load(fh) + if sap_gtid in data and isinstance(data[sap_gtid], dict): + LOG.debug("Found mapping for sap_gtid=%s in metadata file", sap_gtid) + return data[sap_gtid] + LOG.debug("No entry for sap_gtid=%s in metadata file", sap_gtid) + except Exception: + LOG.warning("Unable to read PROJECT_METADATA_FILE at %s", path, exc_info=True) + + # 2) nothing configured + LOG.debug("No KEK mapping available for sap_gtid=%s; returning empty mapping", sap_gtid) + return {} + + def _get_sap_gtid(self, kek_meta_dto: cbase.KEKMetaDTO) -> str | None: + """Resolve 'sap_gtid' from persisted plugin_meta, with possible runtime override.""" + has_meta = bool(kek_meta_dto and getattr(kek_meta_dto, "plugin_meta", None)) + LOG.debug("_get_sap_gtid called; plugin_meta_present=%s", has_meta) + meta = {} + if has_meta: + try: + meta = json.loads(kek_meta_dto.plugin_meta) + except Exception: + LOG.debug("plugin_meta not valid JSON; ignoring persisted value", exc_info=True) + meta = {} + + val = meta.get("sap_gtid") + LOG.debug("_get_sap_gtid resolved sap_gtid=%s", val) + return val + + # ---- Required API ---- + + def get_plugin_name(self) -> str: + LOG.info("SAPKMSCryptoPlugin loaded") + return "sap_kms_crypto" + + def supports(self, type_enum, algorithm=None, bit_length=None, mode=None) -> bool: + LOG.debug("supports called with type=%s algorithm=%s bit_length=%s mode=%s", + type_enum, algorithm, bit_length, mode) + supported = type_enum in ( + cbase.PluginSupportTypes.ENCRYPT_DECRYPT, + cbase.PluginSupportTypes.SYMMETRIC_KEY_GENERATION, + ) + LOG.debug("supports result=%s", supported) + return supported + + def bind_kek_metadata(self, kek_meta_dto: cbase.KEKMetaDTO) -> cbase.KEKMetaDTO: + LOG.debug("bind_kek_metadata called; incoming kek_label=%s", getattr(kek_meta_dto, "kek_label", None)) + kek_meta_dto.plugin_name = self.get_plugin_name() + if not kek_meta_dto.kek_label: + kek_meta_dto.kek_label = "sap-kms-fernet" + LOG.debug("No kek_label provided; defaulted to 'sap-kms-fernet'") + LOG.debug("bind_kek_metadata returning kek_label=%s plugin_name=%s", + kek_meta_dto.kek_label, kek_meta_dto.plugin_name) + return kek_meta_dto + + def encrypt( + self, + encrypt_dto: cbase.EncryptDTO, + kek_meta_dto: cbase.KEKMetaDTO, + project_id: str, + ) -> cbase.ResponseDTO: + LOG.debug("encrypt called for project_id=%s", project_id) + unencrypted = encrypt_dto.unencrypted + if not isinstance(unencrypted, bytes): + LOG.error("encrypt received invalid payload type: %s", type(unencrypted)) + raise ValueError( + u._( + 'Unencrypted data must be a byte type, but was ' + '{unencrypted_type}' + ).format( + unencrypted_type=type(unencrypted) + ) + ) + + sap_gtid = self._get_sap_gtid(kek_meta_dto) + mapping = self._load_sap_gtid_to_kek_mapping(sap_gtid=sap_gtid) + LOG.debug("Encryption selection resolved; sap_gtid=%s mapping_present=%s", + sap_gtid, bool(mapping)) + + try: + f = self._fernet(mapping.get("kek")) + ciphertext = f.encrypt(unencrypted) + LOG.info("Encryption completed for project_id=%s; ciphertext_size=%d bytes", + project_id, len(ciphertext)) + except Exception: + LOG.exception("Encryption operation failed for project_id=%s", project_id) + raise + + resp = cbase.ResponseDTO(cypher_text=ciphertext, kek_meta_extended=sap_gtid) + LOG.debug("encrypt returning ResponseDTO with kek_meta_extended=%s", sap_gtid) + return resp + + def decrypt( + self, + decrypt_dto: cbase.DecryptDTO, + kek_meta_dto: cbase.KEKMetaDTO, + kek_meta_extended: str, + project_id: str, + ) -> bytes: + LOG.debug("decrypt called for project_id=%s sap_gtid=%s", project_id, kek_meta_extended) + mapping = self._load_sap_gtid_to_kek_mapping(sap_gtid=kek_meta_extended) + LOG.debug("Decryption mapping present=%s", bool(mapping)) + try: + f = self._fernet(mapping.get("kek")) + plaintext = f.decrypt(decrypt_dto.encrypted) + LOG.info("Decryption completed for project_id=%s; plaintext_size=%d bytes", + project_id, len(plaintext)) + return plaintext + except Exception: + LOG.exception("Decryption operation failed for project_id=%s", project_id) + raise + + def generate_symmetric( + self, + generate_dto: cbase.GenerateDTO, + kek_meta_dto: cbase.KEKMetaDTO, + project_id: str, + ) -> cbase.ResponseDTO: + LOG.debug("generate_symmetric called for project_id=%s algorithm=%s bit_length=%s mode=%s", + project_id, generate_dto.algorithm, generate_dto.bit_length, generate_dto.mode) + n_bits = generate_dto.bit_length or 256 + if n_bits % 8 != 0 or n_bits <= 0: + LOG.error("Invalid bit_length=%s requested", n_bits) + raise ValueError("bit_length must be a positive multiple of 8") + try: + raw = os.urandom(n_bits // 8) + f = self._fernet(None) + ciphertext = f.encrypt(raw) + LOG.info("Generated symmetric key and encrypted it; ciphertext_size=%d bytes", len(ciphertext)) + return cbase.ResponseDTO(cypher_text=ciphertext, kek_meta_extended=None) + except Exception: + LOG.exception("generate_symmetric operation failed for project_id=%s", project_id) + raise + + def generate_asymmetric( + self, + generate_dto: cbase.GenerateDTO, + kek_meta_dto: cbase.KEKMetaDTO, + project_id: str, + ): + LOG.error("Asymmetric key generation is not supported by SAPKMSCryptoPlugin") + raise NotImplementedError("Asymmetric key generation not supported by SAPKMSCryptoPlugin") diff --git a/barbican/plugin/sap_kms_adapter.py b/barbican/plugin/sap_kms_adapter.py new file mode 100644 index 000000000..6175440c1 --- /dev/null +++ b/barbican/plugin/sap_kms_adapter.py @@ -0,0 +1,222 @@ +# Copyright (c) 2025 SAP SE +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import json +import base64 +from oslo_log import log as logging + +from barbican.plugin.crypto import base as crypto_base +from barbican.plugin.store_crypto import StoreCryptoAdapterPlugin +from barbican.plugin import store_crypto as sc +from barbican.model import repositories as repos + +from barbican.plugin.crypto.sap_kms_plugin import SAPKMSCryptoPlugin + +LOG = logging.getLogger(__name__) +LOG.debug("Loaded PerSecretKEKStoreAdapter module: %s", __name__) +DEFAULT_KEK_META_KEY = "kek-ref" + +class PerSecretKEKStoreAdapter(StoreCryptoAdapterPlugin): + """Secret-store adapter that honors a per-secret KEK selector via user-metadata, + hard-wired to ToyCryptoPlugin (dev/testing only).""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + # Hard-code the delegate crypto engine + self.encrypting_plugin = SAPKMSCryptoPlugin() + self._metadata_key = DEFAULT_KEK_META_KEY + LOG.info("Initialized PerSecretKEKStoreAdapter using %s; metadata_key=%s", + type(self.encrypting_plugin).__name__, self._metadata_key) + + def _extract_user_metadata(self, context): + """Return user metadata as a dict; always re-load from DB by secret id.""" + secret = getattr(context, "secret_model", None) + secret_id = getattr(secret, "id", None) + LOG.debug("Extracting user metadata for secret_model=%s secret_id=%s", + type(secret).__name__ if secret is not None else None, secret_id) + meta = {} + + # 1) Preferred: repository read (fresh DB state) + try: + meta_repo = repos.get_secret_user_meta_repository() + rows = None + get1 = getattr(meta_repo, "get_metadata_for_secret", None) + get2 = getattr(meta_repo, "get_metadata_by_secret_id", None) + LOG.debug("Repository metadata methods available: get_metadata_for_secret=%s get_metadata_by_secret_id=%s", + bool(get1), bool(get2)) + + if secret_id and callable(get1): + rows = get1(secret_id) + LOG.debug("Repository returned %s rows for secret_id=%s", len(rows) if rows else 0, secret_id) + elif secret_id and callable(get2): + rows = get2(secret_id) + LOG.debug("Repository returned %s rows for secret_id=%s", len(rows) if rows else 0, secret_id) + + if isinstance(rows, dict): + meta.update(rows) + elif rows: + meta.update({ + getattr(r, "key", ""): getattr(r, "value", None) + for r in rows if getattr(r, "key", None) + }) + except Exception: + LOG.exception("Failed to read user metadata from repository for secret_id=%s", secret_id) + + # 2) Fallback: relationship on the loaded model (may be empty due to lazy-load) + if not meta: + LOG.debug("Repository metadata empty; attempting relationship fallback for secret_id=%s", secret_id) + try: + rel = getattr(secret, "user_metadata", None) or [] + items = [m for m in rel] # force lazy load + meta.update({ + getattr(m, "key", ""): getattr(m, "value", None) + for m in items if getattr(m, "key", None) + }) + except Exception: + LOG.exception("Relationship fallback failed when loading user metadata for secret_id=%s", secret_id) + + # 3) Normalize keys so 'kek-ref' and 'kek_ref' both work + norm = {(k or "").lower().replace("_", "-"): v for k, v in meta.items() if k} + if "kek-ref" in norm: + meta["kek-ref"] = norm["kek-ref"] + + LOG.debug("User metadata loaded for secret_id=%s: raw=%r normalized=%r", secret_id, meta, norm) + return meta + + @staticmethod + def _with_transient_override(kek_meta_dto, sap_gtid): + """Copy KEK meta and add secret_kek_ref into plugin_meta (transient hint).""" + LOG.debug("Applying transient plugin_meta override; sap_gtid=%s", sap_gtid) + tmp = copy.deepcopy(kek_meta_dto) + try: + pm = json.loads(tmp.plugin_meta) if tmp.plugin_meta else {} + LOG.debug("Parsed existing plugin_meta: %s", pm) + except Exception: + LOG.debug("Existing plugin_meta not parseable JSON; initializing new plugin_meta") + pm = {} + if sap_gtid: + pm["sap_gtid"] = sap_gtid + if pm: + tmp.plugin_meta = json.dumps(pm) + LOG.debug("Updated transient plugin_meta=%s", tmp.plugin_meta) + return tmp + + # Barbican 7.1 calls: store_plugin.store_secret(secret_dto, context) + def store_secret(self, secret_dto, context): + LOG.debug("store_secret invoked for project=%s secret_model=%s", + getattr(context, "project_model", None), getattr(context, "secret_model", None)) + # 1) Resolve/create KEK backing objects (per-project binding etc.) + kek_datum_model, kek_meta_dto = sc._find_or_create_kek_objects( + self.encrypting_plugin, context.project_model + ) + LOG.debug("Obtained KEK datum id=%s plugin_meta=%s", + getattr(kek_datum_model, "id", None), getattr(kek_meta_dto, "plugin_meta", None)) + + # 2) Read caller-provided KEK selector (e.g., "kek-ref") + meta = self._extract_user_metadata(context) + LOG.debug("User metadata for secret=%s: %r", + getattr(getattr(context, "secret_model", None), "id", "?"), meta) + sap_gtid = meta.get("sap_gtid") or meta.get("prefix-gtid") + LOG.debug("Resolved sap_gtid=%s for encryption selection", sap_gtid) + + # 3) Inject transient override for the crypto plugin + kek_meta_for_encrypt = self._with_transient_override(kek_meta_dto, sap_gtid) + LOG.debug("Transient plugin_meta prepared for encryption") + + # 4) Decode base64 secret payload + try: + secret_bytes = base64.b64decode(secret_dto.secret) + LOG.debug("Decoded secret payload: %d bytes", len(secret_bytes)) + except Exception: + LOG.exception("Failed to decode base64 secret payload for secret_model=%s", + getattr(context, "secret_model", None)) + raise + + # 5) Perform encryption via delegate plugin + try: + response_dto = self.encrypting_plugin.encrypt( + crypto_base.EncryptDTO(secret_bytes), + kek_meta_for_encrypt, + getattr(context.project_model, "external_id", None), + ) + LOG.info("Encryption completed for secret_id=%s; kek_meta_extended=%s", + getattr(context.secret_model, "id", None), + getattr(response_dto, "kek_meta_extended", None)) + except Exception: + LOG.exception("Encryption failed for secret_id=%s", getattr(context.secret_model, "id", None)) + raise + + # 6) Persist ciphertext + kek_meta_extended + try: + result = sc._store_secret_and_datum( + context, context.secret_model, kek_datum_model, response_dto + ) + LOG.info("Persisted encrypted datum for secret_id=%s", getattr(context.secret_model, "id", None)) + return result + except Exception: + LOG.exception("Failed to persist encrypted datum for secret_id=%s", getattr(context.secret_model, "id", None)) + raise + + def generate_symmetric_key(self, key_spec, context): + """Generate a symmetric key. + + :param key_spec: KeySpec that contains details on the type of key to + generate + :param context: StoreCryptoContext for secret + :returns: a dictionary that contains metadata about the key + """ + LOG.info("generate_symmetric invoked: alg=%s bit_length=%s mode=%s project=%s", + getattr(key_spec, "alg", None), + getattr(key_spec, "bit_length", None), + getattr(key_spec, "mode", None), + getattr(context, "project_model", None)) + + try: + kek_datum_model, kek_meta_dto = sc._find_or_create_kek_objects( + self.encrypting_plugin, context.project_model + ) + LOG.debug("KEK binding ready for project %s (plugin_meta=%s)", + getattr(context.project_model, "external_id", None), + getattr(kek_meta_dto, "plugin_meta", None)) + except Exception: + LOG.exception("Failed to obtain or create KEK binding for project %s", + getattr(context.project_model, "external_id", None)) + raise + + generate_dto = crypto_base.GenerateDTO(key_spec.alg, + key_spec.bit_length, + key_spec.mode, None) + LOG.debug("Prepared GenerateDTO: alg=%s bit_length=%s mode=%s", + key_spec.alg, key_spec.bit_length, key_spec.mode) + + try: + response_dto = self.encrypting_plugin.generate_symmetric( + generate_dto, kek_meta_dto, context.project_model.external_id) + LOG.info("Symmetric key generation completed for project=%s", getattr(context.project_model, "external_id", None)) + except Exception: + LOG.exception("Symmetric key generation failed for project %s", + getattr(context.project_model, "external_id", None)) + raise + + try: + result = sc._store_secret_and_datum( + context, context.secret_model, kek_datum_model, response_dto + ) + LOG.info("Persisted generated key datum for secret_id=%s", getattr(context.secret_model, "id", None)) + return result + except Exception: + LOG.exception("Failed to persist generated key datum for secret_id=%s", getattr(context.secret_model, "id", None)) + raise diff --git a/barbican/tests/plugin/crypto/test_sapkms_crypto_plugin.py b/barbican/tests/plugin/crypto/test_sapkms_crypto_plugin.py new file mode 100644 index 000000000..4af922858 --- /dev/null +++ b/barbican/tests/plugin/crypto/test_sapkms_crypto_plugin.py @@ -0,0 +1,105 @@ +# Copyright (c) 2025 SAP SE +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import base64 +import pytest + +# Import the real class from your repo +from barbican.plugin.crypto.sap_kms_plugin import SAPKMSCryptoPlugin + +# Tiny stand-ins for Barbican DTOs +class EncryptDTO: + def __init__(self, unencrypted: bytes): + self.unencrypted = unencrypted + +class DecryptDTO: + def __init__(self, encrypted: bytes): + self.encrypted = encrypted + +class KEKMetaDTO: + def __init__(self, plugin_meta: str | None = None, kek_label: str | None = None, plugin_name: str | None = None): + self.plugin_meta = plugin_meta + self.kek_label = kek_label + self.plugin_name = plugin_name + +def dto_with_gtid(gtid: str | None): + import json + return KEKMetaDTO(plugin_meta=(json.dumps({"sap_gtid": gtid}) if gtid else None)) + +def test_encrypt_decrypt_env_key(tmp_env, monkeypatch): + # Arrange: set an explicit env key (stable across process) + monkeypatch.setenv("SAP_KMS_FERNET_KEY", "s7H0y1jzvH8vE9aJt4nK0mPLgSQc5m2cMcb3XQ8s1BY=") + p = SAPKMSCryptoPlugin() + dto = EncryptDTO(b"test") + + # Act + enc = p.encrypt(dto, dto_with_gtid(None), project_id="proj") + dec = p.decrypt(DecryptDTO(enc.cypher_text), dto_with_gtid(None), enc.kek_meta_extended, "proj") + + # Assert + assert dec == b"test" + assert enc.kek_meta_extended is None # because no sap_gtid was used + +def test_encrypt_decrypt_with_mapping(tmp_env, monkeypatch, mapping_file): + # Arrange: use mapping file + gtid to pick scoped key + monkeypatch.setenv("PROJECT_METADATA_FILE", mapping_file) + # Do NOT set SAP_KMS_FERNET_KEY to ensure mapping is used + p = SAPKMSCryptoPlugin() + dto = EncryptDTO(b"alpha") + kek_meta = dto_with_gtid("GTID-123") + + # Act + enc = p.encrypt(dto, kek_meta, "proj") + # The kek_meta_extended should carry sap_gtid (as simple string in current code) + assert enc.kek_meta_extended == "GTID-123" + + dec = p.decrypt(DecryptDTO(enc.cypher_text), kek_meta, enc.kek_meta_extended, "proj") + assert dec == b"alpha" + +def test_encrypt_rejects_non_bytes(tmp_env): + p = SAPKMSCryptoPlugin() + with pytest.raises(ValueError): + p.encrypt(EncryptDTO("not-bytes"), dto_with_gtid(None), "proj") # type: ignore + +def test_generate_symmetric_default_bits(tmp_env, monkeypatch): + # Arrange + monkeypatch.setenv("SAP_KMS_FERNET_KEY", "s7H0y1jzvH8vE9aJt4nK0mPLgSQc5m2cMcb3XQ8s1BY=") + p = SAPKMSCryptoPlugin() + + class GenerateDTO: + def __init__(self, alg=None, bit_length=None, mode=None, passphrase=None): + self.algorithm = alg + self.bit_length = bit_length + self.mode = mode + self.passphrase = passphrase + + # Act + resp = p.generate_symmetric(GenerateDTO(bit_length=None), dto_with_gtid(None), "proj") + + # Assert: just ensure ciphertext is non-empty and decryptable with env key + assert isinstance(resp.cypher_text, (bytes, bytearray)) + # Basic decrypt check + f = p._fernet(None) + f.decrypt(resp.cypher_text) # should not raise + +def test_ephemeral_key_dev_mode(tmp_env, monkeypatch): + # When no key provided, plugin generates ephemeral (dev) key and still works + # NOTE: this relies on current behavior that auto-generates a key; if you + # later guard this behind ALLOW_EPHEMERAL_KEY, set the env here accordingly. + p = SAPKMSCryptoPlugin() + enc = p.encrypt(EncryptDTO(b"x"), dto_with_gtid(None), "proj") + dec = p.decrypt(DecryptDTO(enc.cypher_text), dto_with_gtid(None), enc.kek_meta_extended, "proj") + assert dec == b"x" diff --git a/barbican/tests/plugin/test_sapkms_adapter.py b/barbican/tests/plugin/test_sapkms_adapter.py new file mode 100644 index 000000000..006fcc8f4 --- /dev/null +++ b/barbican/tests/plugin/test_sapkms_adapter.py @@ -0,0 +1,154 @@ +# Copyright (c) 2025 SAP SE +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +import json +import base64 +import types +import pytest + +from barbican.plugin.sap_kms_adapter import PerSecretKEKStoreAdapter + +# Minimal fakes for Barbican pieces the adapter touches + +class FakeSecretModel: + def __init__(self, id_=None, user_metadata=None): + self.id = id_ + self.user_metadata = user_metadata or [] # list of objects with key/value + +class FakeUserMeta: + def __init__(self, key, value): + self.key = key + self.value = value + +class FakeProjectModel: + def __init__(self, external_id="proj-ext"): + self.external_id = external_id + +class FakeContext: + def __init__(self, secret_model=None, project_model=None): + self.secret_model = secret_model or FakeSecretModel() + self.project_model = project_model or FakeProjectModel() + +class FakeSecretDTO: + def __init__(self, secret_bytes: bytes): + # adapter expects base64-encoded string + self.secret = base64.b64encode(secret_bytes).decode("ascii") + +# Patch points in modules the adapter imports +@pytest.fixture(autouse=True) +def patch_repos(monkeypatch): + """Patch repos.get_secret_user_meta_repository to return controlled metadata.""" + class FakeRepo: + def get_metadata_for_secret(self, secret_id): + return [] # default: nothing in DB + + monkeypatch.setattr("barbican.plugin.sap_kms_adapter.repos.get_secret_user_meta_repository", lambda: FakeRepo()) + yield + +@pytest.fixture +def patched_crypto(monkeypatch): + """Patch the underlying crypto plugin used by the adapter to a trivial fake.""" + class FakeCrypto: + def encrypt(self, encrypt_dto, kek_meta_dto, project_id): + # echo-like cipher: prepend a marker to allow verifying flow + ct = b"CT:" + encrypt_dto.unencrypted + # Return object with cypher_text + kek_meta_extended + Resp = types.SimpleNamespace + # propagate sap_gtid from kek_meta_dto.plugin_meta to check path + sap_gtid = None + try: + meta = json.loads(kek_meta_dto.plugin_meta) if kek_meta_dto.plugin_meta else {} + sap_gtid = meta.get("sap_gtid") + except Exception: + sap_gtid = None + return Resp(cypher_text=ct, kek_meta_extended=sap_gtid) + + def generate_symmetric(self, generate_dto, kek_meta_dto, project_id): + Resp = types.SimpleNamespace + return Resp(cypher_text=b"GEN:deadbeef", kek_meta_extended=None) + + # Replace the class construction inside the adapter module + monkeypatch.setattr("barbican.plugin.sap_kms_adapter.SAPKMSCryptoPlugin", FakeCrypto) + yield + +@pytest.fixture +def patch_store_crypto(monkeypatch): + """Patch store_crypto helpers the adapter calls to avoid DB persistence.""" + # sc._find_or_create_kek_objects -> returns (kek_datum_model, kek_meta_dto) + class FakeKEKDatum: + id = "kek-datum-id" + + class FakeKEKMetaDTO: + def __init__(self): + self.plugin_meta = None + self.kek_label = None + self.plugin_name = None + + def fake_find_or_create(plugin, project_model): + return FakeKEKDatum(), FakeKEKMetaDTO() + + # sc._store_secret_and_datum -> just returns a sentinel + def fake_store(context, secret_model, kek_datum_model, response_dto): + return {"stored": True, "len": len(response_dto.cypher_text), "kek_meta_extended": response_dto.kek_meta_extended} + + monkeypatch.setattr("barbican.plugin.sap_kms_adapter.sc._find_or_create_kek_objects", fake_find_or_create) + monkeypatch.setattr("barbican.plugin.sap_kms_adapter.sc._store_secret_and_datum", fake_store) + yield + +def test_store_secret_no_user_metadata(patched_crypto, patch_store_crypto): + adapter = PerSecretKEKStoreAdapter() + ctx = FakeContext(secret_model=FakeSecretModel(id_="S1")) + dto = FakeSecretDTO(b"abcd") + + result = adapter.store_secret(dto, ctx) + assert result["stored"] is True + assert result["len"] == len(b"CT:abcd") + # No gtid provided, so kek_meta_extended should be None + assert result["kek_meta_extended"] is None + +def test_store_secret_with_sap_gtid_on_relationship(patched_crypto, patch_store_crypto): + adapter = PerSecretKEKStoreAdapter() + # Simulate controller attaching user metadata to the secret model before store + meta = [FakeUserMeta("sap_gtid", "GTID-123")] + ctx = FakeContext(secret_model=FakeSecretModel(id_="S2", user_metadata=meta)) + dto = FakeSecretDTO(b"ping") + + result = adapter.store_secret(dto, ctx) + assert result["stored"] is True + assert result["kek_meta_extended"] == "GTID-123" # flowed through transient plugin_meta + +def test_store_secret_base64_error_is_logged_and_raised(patched_crypto, patch_store_crypto): + adapter = PerSecretKEKStoreAdapter() + ctx = FakeContext(secret_model=FakeSecretModel(id_="S3")) + bad = FakeSecretDTO(b"ok") + # corrupt the base64 payload + bad.secret = "%%%NOT-BASE64%%%" + + with pytest.raises(Exception): + adapter.store_secret(bad, ctx) + +def test_generate_symmetric_flow(patched_crypto, patch_store_crypto): + adapter = PerSecretKEKStoreAdapter() + + class KeySpec: + def __init__(self, alg="AES", bit_length=256, mode=None): + self.alg = alg + self.bit_length = bit_length + self.mode = mode + + ctx = FakeContext(secret_model=FakeSecretModel(id_="S4")) + out = adapter.generate_symmetric_key(KeySpec(), ctx) + assert out["stored"] is True + assert out["len"] == len(b"GEN:deadbeef") diff --git a/setup.cfg b/setup.cfg index ac9c8dcbd..1bb0b5b28 100644 --- a/setup.cfg +++ b/setup.cfg @@ -63,12 +63,14 @@ barbican.secretstore.plugin = dogtag_crypto = barbican.plugin.dogtag:DogtagKRAPlugin kmip_plugin = barbican.plugin.kmip_secret_store:KMIPSecretStore vault_plugin = barbican.plugin.vault_secret_store:VaultSecretStore + sap_kms_adapter = barbican.plugin.sap_kms_adapter:PerSecretKEKStoreAdapter barbican.crypto.plugin = p11_crypto = barbican.plugin.crypto.p11_crypto:P11CryptoPlugin simple_crypto = barbican.plugin.crypto.simple_crypto:SimpleCryptoPlugin hsm_partition_crypto = barbican.plugin.crypto.hsm_partition_crypto:HSMPartitionCryptoPlugin utimaco_hsm_crypto = barbican.plugin.crypto.hsm_partition_crypto:UtimacoHSMPartitionCryptoPlugin thales_hsm_crypto = barbican.plugin.crypto.hsm_partition_crypto:ThalesHSMPartitionCryptoPlugin + sap_kms_crypto = barbican.plugin.crypto.sap_kms_plugin:SAPKMSCryptoPlugin barbican.test.crypto.plugin = test_crypto = barbican.tests.crypto.test_plugin:TestCryptoPlugin oslo.config.opts =