diff --git a/auditmiddleware/_api.py b/auditmiddleware/_api.py index 1ea117e..b9fdf5f 100644 --- a/auditmiddleware/_api.py +++ b/auditmiddleware/_api.py @@ -57,8 +57,12 @@ def _make_uuid(s): if s.isdigit(): return str(uuid.UUID(int=int(s))) - else: - return s + # Extract the UUID from HATEOAS-style resource refs (e.g. secret_ref, + # container_ref) that carry a full URL instead of a bare UUID. + # pycadf's identifier.py warns when a non-UUID string is used as an id. + if '://' in s: + s = s.rstrip('/').rsplit('/', 1)[-1] + return s class ConfigError(Exception): diff --git a/auditmiddleware/tests/unit/test_mappings.py b/auditmiddleware/tests/unit/test_mappings.py index 00a200e..1a9f368 100644 --- a/auditmiddleware/tests/unit/test_mappings.py +++ b/auditmiddleware/tests/unit/test_mappings.py @@ -600,3 +600,106 @@ def test_get_list_zones(self): self.check_event(request, response, event, taxonomy.ACTION_LIST, "service/dns/zones", None, self.service_name) + + +class BarbicanAuditMappingTest(base.BaseAuditMiddlewareTest): + """Test the barbican mapping file. + + Barbican uses HATEOAS-style refs (full URLs) as ID fields in its + response bodies (e.g. ``secret_ref``, ``container_ref``). The + ``custom_id`` mapping points at these fields, so the middleware must + extract the bare UUID from the URL instead of forwarding the whole URL + as the CADF target ID — otherwise pycadf emits a UserWarning about + invalid UUIDs. + """ + + def setUp(self): + """Set up the Barbican audit mapping test.""" + super(BarbicanAuditMappingTest, self).setUp() + + self.audit_map_file_fixture = os.path.realpath( + "etc/barbican_audit_map.yaml") + + self.service_name = 'barbican' + self.service_type = 'data/security/keymanager' + + @property + def audit_map(self): + """The audit mapping filename under test.""" + return self.audit_map_file_fixture + + def test_post_create_secret_extracts_uuid_from_ref(self): + """UUID must be extracted from the HATEOAS secret_ref URL. + + When a secret is created (POST /v1/secrets) barbican returns a + ``secret_ref`` containing a full URL such as: + ``https://keymanager.example.com:443/v1/secrets/`` + + The ``custom_id: secret_ref`` mapping causes the middleware to read + this field as the resource ID. Without the URL-stripping fix the + whole URL is passed to pycadf, which issues: + + UserWarning: Invalid uuid: https://…/v1/secrets/. + To ensure interoperability, identifiers should be a valid uuid. + + After the fix only the trailing UUID segment must appear as the CADF + target id. + """ + secret_id = str(uuid.uuid4()) + secret_ref = 'https://keymanager-3.eu-de-1.cloud.sap:443/v1/secrets/' \ + + secret_id + + url = '/v1/secrets' + req_json = {'name': 'my-secret', 'payload': 'secret-value'} + resp_json = {'secret_ref': secret_ref} + + import warnings + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter('always') + request, response = self.build_api_call( + 'POST', url, req_json=req_json, resp_json=resp_json) + event = self.build_event(request, response) + pycadf_warnings = [ + w for w in caught + if 'Invalid uuid' in str(w.message) + ] + + self.assertFalse( + pycadf_warnings, + "pycadf emitted 'Invalid uuid' warnings — the full URL was " + "forwarded as the CADF target id instead of the bare UUID: %s" + % [str(w.message) for w in pycadf_warnings]) + + self.check_event(request, response, event, taxonomy.ACTION_CREATE, + "data/security/keymanager/secret", secret_id) + + def test_post_create_container_extracts_uuid_from_ref(self): + """Same URL-stripping fix applies to container_ref.""" + container_id = str(uuid.uuid4()) + container_ref = \ + 'https://keymanager-3.eu-de-1.cloud.sap:443/v1/containers/' \ + + container_id + + url = '/v1/containers' + req_json = {'name': 'my-container', 'type': 'generic'} + resp_json = {'container_ref': container_ref} + + import warnings + with warnings.catch_warnings(record=True) as caught: + warnings.simplefilter('always') + request, response = self.build_api_call( + 'POST', url, req_json=req_json, resp_json=resp_json) + event = self.build_event(request, response) + pycadf_warnings = [ + w for w in caught + if 'Invalid uuid' in str(w.message) + ] + + self.assertFalse( + pycadf_warnings, + "pycadf emitted 'Invalid uuid' warnings — the full URL was " + "forwarded as the CADF target id instead of the bare UUID: %s" + % [str(w.message) for w in pycadf_warnings]) + + self.check_event(request, response, event, taxonomy.ACTION_CREATE, + "data/security/keymanager/container", container_id)