Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions auditmiddleware/_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,12 @@
def _make_uuid(s):
if s.isdigit():
return str(uuid.UUID(int=int(s)))
else:
return s
# Extract the UUID from HATEOAS-style resource refs (e.g. secret_ref,
# container_ref) that carry a full URL instead of a bare UUID.
# pycadf's identifier.py warns when a non-UUID string is used as an id.
if '://' in s:
s = s.rstrip('/').rsplit('/', 1)[-1]
return s


class ConfigError(Exception):
Expand Down
103 changes: 103 additions & 0 deletions auditmiddleware/tests/unit/test_mappings.py
Original file line number Diff line number Diff line change
Expand Up @@ -600,3 +600,106 @@ def test_get_list_zones(self):
self.check_event(request, response, event, taxonomy.ACTION_LIST,
"service/dns/zones",
None, self.service_name)


class BarbicanAuditMappingTest(base.BaseAuditMiddlewareTest):
"""Test the barbican mapping file.

Barbican uses HATEOAS-style refs (full URLs) as ID fields in its
response bodies (e.g. ``secret_ref``, ``container_ref``). The
``custom_id`` mapping points at these fields, so the middleware must
extract the bare UUID from the URL instead of forwarding the whole URL
as the CADF target ID — otherwise pycadf emits a UserWarning about
invalid UUIDs.
"""

def setUp(self):
"""Set up the Barbican audit mapping test."""
super(BarbicanAuditMappingTest, self).setUp()

self.audit_map_file_fixture = os.path.realpath(
"etc/barbican_audit_map.yaml")

self.service_name = 'barbican'
self.service_type = 'data/security/keymanager'

@property
def audit_map(self):
"""The audit mapping filename under test."""
return self.audit_map_file_fixture

def test_post_create_secret_extracts_uuid_from_ref(self):
"""UUID must be extracted from the HATEOAS secret_ref URL.

When a secret is created (POST /v1/secrets) barbican returns a
``secret_ref`` containing a full URL such as:
``https://keymanager.example.com:443/v1/secrets/<uuid>``

The ``custom_id: secret_ref`` mapping causes the middleware to read
this field as the resource ID. Without the URL-stripping fix the
whole URL is passed to pycadf, which issues:

UserWarning: Invalid uuid: https://…/v1/secrets/<uuid>.
To ensure interoperability, identifiers should be a valid uuid.

After the fix only the trailing UUID segment must appear as the CADF
target id.
"""
secret_id = str(uuid.uuid4())
secret_ref = 'https://keymanager-3.eu-de-1.cloud.sap:443/v1/secrets/' \
+ secret_id

url = '/v1/secrets'
req_json = {'name': 'my-secret', 'payload': 'secret-value'}
resp_json = {'secret_ref': secret_ref}

import warnings
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter('always')
request, response = self.build_api_call(
'POST', url, req_json=req_json, resp_json=resp_json)
event = self.build_event(request, response)
pycadf_warnings = [
w for w in caught
if 'Invalid uuid' in str(w.message)
]

self.assertFalse(
pycadf_warnings,
"pycadf emitted 'Invalid uuid' warnings — the full URL was "
"forwarded as the CADF target id instead of the bare UUID: %s"
% [str(w.message) for w in pycadf_warnings])

self.check_event(request, response, event, taxonomy.ACTION_CREATE,
"data/security/keymanager/secret", secret_id)

def test_post_create_container_extracts_uuid_from_ref(self):
"""Same URL-stripping fix applies to container_ref."""
container_id = str(uuid.uuid4())
container_ref = \
'https://keymanager-3.eu-de-1.cloud.sap:443/v1/containers/' \
+ container_id

url = '/v1/containers'
req_json = {'name': 'my-container', 'type': 'generic'}
resp_json = {'container_ref': container_ref}

import warnings
with warnings.catch_warnings(record=True) as caught:
warnings.simplefilter('always')
request, response = self.build_api_call(
'POST', url, req_json=req_json, resp_json=resp_json)
event = self.build_event(request, response)
pycadf_warnings = [
w for w in caught
if 'Invalid uuid' in str(w.message)
]

self.assertFalse(
pycadf_warnings,
"pycadf emitted 'Invalid uuid' warnings — the full URL was "
"forwarded as the CADF target id instead of the bare UUID: %s"
% [str(w.message) for w in pycadf_warnings])

self.check_event(request, response, event, taxonomy.ACTION_CREATE,
"data/security/keymanager/container", container_id)