Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions barbican/api/transfer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

146 changes: 146 additions & 0 deletions barbican/api/transfer/api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# Copyright 2024 SAP SE
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

"""
Handles all requests relating to transferring ownership of secrets.
"""
secret_transfer_opts = [
cfg.IntOpt('secret_transfer_salt_length', default=8,
help='The number of characters in the salt.'),
cfg.IntOpt('secret_transfer_key_length', default=16,
help='The number of characters in the '
'autogenerated auth key.'), ]

CONF = cfg.CONF
CONF.register_opts(secret_transfer_opts)

LOG = logging.getLogger(__name__)

from barbican.api import api
from barbican.common import base
from barbican import consumers, repo
from barbican.models import Secret, SecretConsumer

class SecretTransferController(base.BaseController):
"""
API for interacting secret transfers.
"""

def __init__(self, secret):
super().__init__()
self.secret = secret
self.consumers = consumers.SecretConsumersController(secret)
self.consumer_repo = repo.get_secret_consumer_repository()
self.transport_key_repo = repo.get_transport_key_repository()
self.secret_repo = repo.get_secret_repository()

@index.when(method='PUT')
@utils.allow_all_content_types
@controllers.handle_exceptions(u._('Secret transfer'))
@controllers.enforce_rbac('secret:transfer')
def transfer_secret(self, consumer_data):
"""
Transfers the secret to the specified consumer.
"""

# Generate a random auth key and salt
salt = self._get_random_string(CONF.secret_transfer_salt_length)
auth_key = self._get_random_string(CONF.secret_transfer_key_length)
crypt_hash = self._get_crypt_hash(salt, auth_key)

# Transfer the secret
consumer = self.consumers.create(consumer_data)
self.consumers.transfer_secret(consumer, self.secret)

# Update the database
secret_consumer = SecretConsumer(
secret_id=self.secret.id,
consumer_id=consumer.id,
status='ACTIVE',
salt=salt,
crypt_hash=crypt_hash
)
self.consumer_repo.save(secret_consumer)

# Transfer the secret UUID
self.secret.uuid = uuid.uuid4()
self.secret_repo.save(self.secret)

# Include source and target projects in the response
source_project = self.secret.project
target_project = consumer.project

# Include secret metadata, payload, and secret consumers in the response
secret_fields = putil.mime_types.augment_fields_with_content_types(
self.secret)
secret_fields['consumers'] = self.consumers.get_all()
secret_fields['metadata'] = self.secret.metadata
secret_fields['payload'] = self.secret.payload

LOG.info('Transferred secret %s to consumer %s', self.secret.id, consumer_data)
return {'secret_id': self.secret.id,
'consumer_id': consumer.id,
'auth_key': auth_key,
'source_project': source_project,
'target_project': target_project,
'secret_metadata': secret_fields}

@index.when(method='GET')
@utils.allow_all_content_types
@controllers.handle_exceptions(u._('Secret retrieval'))
@controllers.enforce_rbac('secret:get')
def retrieve_secret(self, consumer_id, auth_key):
"""
Retrieves the secret for the specified consumer.
"""
# Retrieve the consumer
consumer = self.consumer_repo.get(consumer_id)

# Check the auth key and salt
secret_consumer = self.consumer_repo.get_by_consumer_id(consumer_id)
if secret_consumer.crypt_hash!= self._get_crypt_hash(secret_consumer.salt, auth_key):
raise exceptions.NotAuthorized('Invalid auth key')

# Retrieve the secret
secret = self.consumers.retrieve_secret(consumer, self.secret)

# Include secret metadata and secret consumers in the response
secret_fields = putil.mime_types.augment_fields_with_content_types(
secret)
secret_fields['consumers'] = self.consumers.get_all()
secret_fields['metadata'] = secret.metadata

LOG.info('Retrieved secret %s for consumer %s', self.secret.id, consumer_id)
return secret_fields

@index.when(method='DELETE')
@utils.allow_all_content_types
@controllers.handle_exceptions(u._('Secret deletion'))
@controllers.enforce_rbac('secret:delete')
def delete_secret(self, consumer_id):
"""
Deletes the secret for the specifiedconsumer.
"""
LOG.info('Deleting secret %s for consumer %s', self.secret.id, consumer_id)
# Retrieve the consumer
consumer = self.consumer_repo.get(consumer_id)

# Delete the secret
self.consumers.delete_secret(consumer, self.secret)

# Update the database
secret_consumer = self.consumer_repo.get_by_consumer_id(consumer_id)
self.consumer_repo.delete(secret_consumer)

LOG.info('Deleted secret %s for consumer %s', self.secret.id, consumer_id)