diff --git a/CHANGELOG.md b/CHANGELOG.md index 1496d00d..82e9cd80 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- Secrets Manager Service +- TLS cert installer script ### Changed ### Removed diff --git a/fbpcp/entity/secret.py b/fbpcp/entity/secret.py new file mode 100644 index 00000000..a3b07897 --- /dev/null +++ b/fbpcp/entity/secret.py @@ -0,0 +1,19 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +# pyre-strict + +from dataclasses import dataclass, field +from typing import Dict + + +@dataclass +class StringSecret: + id: str + name: str + value: str + create_date: str + tags: Dict[str, str] = field(default_factory=dict) diff --git a/fbpcp/gateway/secrets_manager.py b/fbpcp/gateway/secrets_manager.py new file mode 100644 index 00000000..8caa5ff3 --- /dev/null +++ b/fbpcp/gateway/secrets_manager.py @@ -0,0 +1,96 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +# pyre-strict + +from functools import reduce +from typing import Any, Dict, List, Optional + +import boto3 +from botocore.client import BaseClient +from fbpcp.decorator.error_handler import error_handler +from fbpcp.entity.secret import StringSecret +from fbpcp.gateway.aws import AWSGateway +from fbpcp.util.aws import convert_list_to_dict + + +class AWSSecretsManagerGateway(AWSGateway): + def __init__( + self, + region: str, + access_key_id: Optional[str] = None, + access_key_data: Optional[str] = None, + config: Optional[Dict[str, Any]] = None, + ) -> None: + super().__init__(region, access_key_id, access_key_data, config) + self.client: BaseClient = boto3.client( + "secretsmanager", region_name=self.region, **self.config + ) + + @error_handler + def create_secret( + self, + secret_name: str, + secret_value: str, + tags: Optional[Dict[str, str]] = None, + ) -> str: + """ + Returns the id (ARN) of the created secret + """ + tags_dict = [] + if tags: + tags_dict = self._generate_tags_dict(tags) + + response = self.client.create_secret( + Name=secret_name, SecretString=secret_value, Tags=tags_dict + ) + return response["ARN"] + + @error_handler + def get_secret( + self, + secret_id: str, + ) -> StringSecret: + # Get secret value. + # It retrieves the current version of the secret. + val_response = self.client.get_secret_value(SecretId=secret_id) + # Get secret details. E.g tags + descr_response = self.client.describe_secret(SecretId=secret_id) + + return self._convert_resp_to_secret(descr_response, val_response) + + @error_handler + def delete_secret( + self, + secret_id: str, + ) -> None: + # Delete secret. + self.client.delete_secret(SecretId=secret_id) + + def _convert_resp_to_secret( + self, descr_resp: Dict[str, Any], val_resp: Dict[str, Any] + ) -> StringSecret: + """ + Encapsulate the responses into a Secret object + """ + id = descr_resp["ARN"] + name = descr_resp["Name"] + value = val_resp.get("SecretString") + create_date = descr_resp.get("CreatedDate") + tags = convert_list_to_dict(descr_resp.get("Tags"), "Key", "Value") + + return StringSecret( + id=id, name=name, value=value, create_date=create_date, tags=tags + ) + + def _generate_tags_dict(self, tags: Dict[str, str]) -> List[Dict[str, str]]: + # Input tag format {"tag1": "v1", "tag2", "v2", ...} + # AWS required format [{"Key": "tag1", "Value": "v1"}, ...} + new_dict = reduce( + lambda x, y: [*x, {"Key": y, "Value": tags[y]}], tags.keys(), [] + ) + + return new_dict diff --git a/fbpcp/service/secrets_manager.py b/fbpcp/service/secrets_manager.py new file mode 100644 index 00000000..e7208683 --- /dev/null +++ b/fbpcp/service/secrets_manager.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +# pyre-strict + +import abc +from typing import Dict, Optional + +from fbpcp.entity.secret import StringSecret + + +class SecretsManagerService(abc.ABC): + @abc.abstractmethod + def create_secret( + self, secret_name: str, secret_value: str, tags: Optional[Dict[str, str]] = None + ) -> str: + pass + + @abc.abstractmethod + def get_secret( + self, + secret_id: str, + ) -> StringSecret: + pass + + @abc.abstractmethod + async def create_secret_async( + self, secret_name: str, secret_value: str, tags: Optional[Dict[str, str]] = None + ) -> str: + pass + + @abc.abstractmethod + async def get_secret_async( + self, + secret_id: str, + ) -> StringSecret: + pass + + @abc.abstractmethod + def delete_secret( + self, + secret_id: str, + ) -> None: + pass + + @abc.abstractmethod + async def delete_secret_async( + self, + secret_id: str, + ) -> None: + pass diff --git a/fbpcp/service/secrets_manager_aws.py b/fbpcp/service/secrets_manager_aws.py new file mode 100644 index 00000000..336a24a2 --- /dev/null +++ b/fbpcp/service/secrets_manager_aws.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +# pyre-strict + +import asyncio + +from typing import Any, Dict, Optional + +from fbpcp.entity.secret import StringSecret + +from fbpcp.gateway.secrets_manager import AWSSecretsManagerGateway +from fbpcp.service.secrets_manager import SecretsManagerService + + +class AWSSecretsManagerService(SecretsManagerService): + def __init__( + self, + region: str, + access_key_id: Optional[str] = None, + access_key_data: Optional[str] = None, + config: Optional[Dict[str, Any]] = None, + ) -> None: + self.secret_gateway = AWSSecretsManagerGateway( + region, access_key_id, access_key_data, config + ) + + def create_secret( + self, + secret_name: str, + secret_value: str, + tags: Optional[Dict[str, str]] = None, + ) -> str: + secret_id = self.secret_gateway.create_secret( + secret_name=secret_name, secret_value=secret_value, tags=tags + ) + + return secret_id + + def get_secret( + self, + secret_id: str, + ) -> StringSecret: + # secret id can be ARN or secret name + secret = self.secret_gateway.get_secret(secret_id=secret_id) + + return secret + + async def create_secret_async( + self, + secret_name: str, + secret_value: str, + tags: Optional[Dict[str, str]] = None, + ) -> str: + loop = asyncio.get_running_loop() + result = await loop.run_in_executor( + None, self.create_secret, secret_name, secret_value, tags + ) + return result + + async def get_secret_async(self, secret_id: str) -> StringSecret: + loop = asyncio.get_running_loop() + result = await loop.run_in_executor(None, self.get_secret, secret_id) + return result + + def delete_secret(self, secret_id: str) -> None: + self.secret_gateway.delete_secret(secret_id=secret_id) + + async def delete_secret_async(self, secret_id: str) -> None: + loop = asyncio.get_running_loop() + await loop.run_in_executor(None, self.delete_secret, secret_id) diff --git a/scripts/__init__.py b/scripts/__init__.py new file mode 100644 index 00000000..4b87eb9e --- /dev/null +++ b/scripts/__init__.py @@ -0,0 +1,5 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. diff --git a/scripts/tests/test_tls_cert_installer.py b/scripts/tests/test_tls_cert_installer.py new file mode 100644 index 00000000..ded2e03d --- /dev/null +++ b/scripts/tests/test_tls_cert_installer.py @@ -0,0 +1,143 @@ +#!/usr/bin/env fbpython +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +import os +import unittest +from typing import List, Optional + +from unittest.mock import call, MagicMock, patch + +from tls.tls_cert_installer import DEFAULT_REGION, main + + +class TestTlsCertInstaller(unittest.TestCase): + def setUp(self) -> None: + super().setUp() + self.server_cert_content = "test_server_certificate" + self.server_cert_path = "test_server_certificate_path" + self.issuer_cert_content = "test_issuer_certificate" + self.issuer_cert_path = "test_issuer_certificate_path" + self.private_key_content = "test_private_key" + self.private_key_path = "test_private_key_path" + self.home_dir = "/home/onedocker" + self.ip_address = "10.0.0.0" + self.server_uri = "study0.pc.facebook.com" + self.private_key_ref = "test_private_key_ref" + + @patch("tls.tls_cert_installer._write_content_to_file") + @patch("tls.tls_cert_installer.os.getenv") + def test_main_publisher_side( + self, mock_get_env_vars: MagicMock, mock_write: MagicMock + ) -> None: + # Arrange + mock_get_env_vars.side_effect = self._get_env_vars_in_the_order_of_being_called( + server_cert_content=self.server_cert_content, + server_cert_path=self.server_cert_path, + issuer_cert_content=self.issuer_cert_content, + issuer_cert_path=self.issuer_cert_path, + private_key_content=self.private_key_content, + private_key_path=self.private_key_path, + home_dir=self.home_dir, + ) + full_server_cert_path = os.path.join(self.home_dir, self.server_cert_path) + full_issuer_cert_path = os.path.join(self.home_dir, self.issuer_cert_path) + full_private_key_path = os.path.join(self.home_dir, self.private_key_path) + + # Act + main() + + # Assert + mock_write.assert_has_calls( + [ + call(full_server_cert_path, self.server_cert_content), + call(full_issuer_cert_path, self.issuer_cert_content), + call(full_private_key_path, self.private_key_content), + ] + ) + + @patch("tls.tls_cert_installer.os.system") + @patch("tls.tls_cert_installer._write_content_to_file") + @patch("tls.tls_cert_installer.os.getenv") + def test_main_partner_side( + self, + mock_get_env_vars: MagicMock, + mock_write: MagicMock, + mock_os_system: MagicMock, + ) -> None: + # Arrange + mock_get_env_vars.side_effect = self._get_env_vars_in_the_order_of_being_called( + issuer_cert_content=self.issuer_cert_content, + issuer_cert_path=self.issuer_cert_path, + home_dir=self.home_dir, + ip_address=self.ip_address, + server_uri=self.server_uri, + ) + full_issuer_cert_path = os.path.join(self.home_dir, self.issuer_cert_path) + + # Act + main() + + # Assert + mock_write.assert_called_once_with( + full_issuer_cert_path, self.issuer_cert_content + ) + mock_os_system.assert_called_once_with( + f"sudo /home/onedocker/package/write_routing.sh {self.ip_address} {self.server_uri}" + ) + + @patch("tls.tls_cert_installer._get_secret") + @patch("tls.tls_cert_installer._write_content_to_file") + @patch("tls.tls_cert_installer.os.getenv") + def test_main_with_secret( + self, + mock_get_env_vars: MagicMock, + mock_write: MagicMock, + mock_get_secret: MagicMock, + ) -> None: + # Arrange + secret_value = "test_secret_value" + mock_get_secret.return_value = secret_value + mock_get_env_vars.side_effect = self._get_env_vars_in_the_order_of_being_called( + private_key_ref=self.private_key_ref, + private_key_path=self.private_key_path, + home_dir=self.home_dir, + ) + full_private_key_path = os.path.join(self.home_dir, self.private_key_path) + + # Act + main() + + # Assert + mock_get_secret.assert_called_once_with(self.private_key_ref, DEFAULT_REGION) + mock_write.assert_called_with(full_private_key_path, secret_value) + + def _get_env_vars_in_the_order_of_being_called( + self, + server_cert_content: Optional[str] = None, + server_cert_path: Optional[str] = None, + issuer_cert_content: Optional[str] = None, + issuer_cert_path: Optional[str] = None, + private_key_content: Optional[str] = None, + private_key_ref: Optional[str] = None, + private_key_path: Optional[str] = None, + home_dir: Optional[str] = None, + ip_address: Optional[str] = None, + server_uri: Optional[str] = None, + region: Optional[str] = None, + ) -> List[Optional[str]]: + return [ + server_cert_content, + server_cert_path, + issuer_cert_content, + issuer_cert_path, + private_key_content, + private_key_ref, + private_key_path, + home_dir, + ip_address, + server_uri, + region, + ] diff --git a/scripts/tls_cert_installer.py b/scripts/tls_cert_installer.py new file mode 100644 index 00000000..bc5a3706 --- /dev/null +++ b/scripts/tls_cert_installer.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +# pyre-strict + +import logging +import os +import sys + +from fbpcp.service.secrets_manager_aws import AWSSecretsManagerService + +# environment variables +SERVER_PRIVATE_KEY = "SERVER_PRIVATE_KEY" +SERVER_PRIVATE_KEY_REF = "SERVER_PRIVATE_KEY_REF" +SERVER_CERTIFICATE = "SERVER_CERTIFICATE" +ISSUER_CERTIFICATE = "ISSUER_CERTIFICATE" +SERVER_PRIVATE_KEY_PATH = "SERVER_PRIVATE_KEY_PATH" +SERVER_CERTIFICATE_PATH = "SERVER_CERTIFICATE_PATH" +ISSUER_CERTIFICATE_PATH = "ISSUER_CERTIFICATE_PATH" +HOME_DIR = "HOME" +HOSTALIASES = "HOSTALIASES" +IP_ADDRESS = "IP_ADDRESS" +SERVER_HOSTNAME = "SERVER_HOSTNAME" +REGION = "REGION" + +# other constants +HOST_FILE_PATH = "/etc/hosts" +DEFAULT_REGION = "us-west-2" + + +def _get_env_var_if_set(env_var: str, default_val: str) -> str: + val = os.getenv(env_var) + return val if val else default_val + + +def _write_content_to_file(full_path: str, content: str) -> None: + parent_path = "/".join(full_path.split("/")[:-1]) + os.makedirs(parent_path, exist_ok=True) + with open(full_path, "w") as fw: + fw.write(content) + + +def _get_secret(secret_id: str, region: str) -> str: + secret_svc = AWSSecretsManagerService(region) + return secret_svc.get_secret(secret_id).value + + +def main() -> None: + logger = logging.getLogger() + streamHandler = logging.StreamHandler(sys.stdout) + formatter = logging.Formatter("%(levelname)s:%(filename)s:%(message)s") + logger.setLevel(logging.DEBUG) + streamHandler.setFormatter(formatter) + logger.addHandler(streamHandler) + + logging.info("Reading certificate content from environment variables...") + server_certificate = _get_env_var_if_set(SERVER_CERTIFICATE, "") + server_certificate_path = _get_env_var_if_set(SERVER_CERTIFICATE_PATH, "") + issuer_certificate = _get_env_var_if_set(ISSUER_CERTIFICATE, "") + issuer_certificate_path = _get_env_var_if_set(ISSUER_CERTIFICATE_PATH, "") + private_key = _get_env_var_if_set(SERVER_PRIVATE_KEY, "") + private_key_ref = _get_env_var_if_set(SERVER_PRIVATE_KEY_REF, "") + private_key_path = _get_env_var_if_set(SERVER_PRIVATE_KEY_PATH, "") + home_dir = _get_env_var_if_set(HOME_DIR, "") + ip_address = _get_env_var_if_set(IP_ADDRESS, "") + server_hostname = _get_env_var_if_set(SERVER_HOSTNAME, "") + region = _get_env_var_if_set(REGION, DEFAULT_REGION) + + try: + logging.info("Starting writing certificates...") + if server_certificate_path and server_certificate: + full_server_cert_path = os.path.join(home_dir, server_certificate_path) + _write_content_to_file(full_server_cert_path, server_certificate) + logging.info(f"Wrote server certificate to {full_server_cert_path}") + + if issuer_certificate_path and issuer_certificate: + full_issuer_cert_path = os.path.join(home_dir, issuer_certificate_path) + _write_content_to_file(full_issuer_cert_path, issuer_certificate) + logging.info(f"Wrote issuer certificate to {full_issuer_cert_path}") + + if private_key_path: + full_private_key_path = os.path.join(home_dir, private_key_path) + if private_key_ref: + secret = _get_secret(private_key_ref, region) + _write_content_to_file(full_private_key_path, secret) + logging.info(f"Wrote private_key to {full_private_key_path}") + elif private_key: + _write_content_to_file(full_private_key_path, private_key) + logging.info(f"Wrote private_key to {full_private_key_path}") + + if ip_address and server_hostname: + logging.info("Start setting up routing config in the host file.") + write_routing_script_path = os.path.join( + home_dir, "package/write_routing.sh" + ) + # sudo permission to run this script will be built into onedocker image + # with specified user group. + os.system( + f"sudo {write_routing_script_path} {ip_address} {server_hostname}" + ) + logging.info(f"Wrote IP address and host name to {HOST_FILE_PATH}") + else: + logging.info( + "Routing not configured because at least one of ip_address and server_hostname is not specified." + ) + except Exception as e: + raise Exception( + f"Caught an exception while executing the binary: {e.with_traceback(e.__traceback__)}" + ) from e + + +if __name__ == "__main__": + main() diff --git a/scripts/write_routing.sh b/scripts/write_routing.sh new file mode 100644 index 00000000..7111bfbc --- /dev/null +++ b/scripts/write_routing.sh @@ -0,0 +1,3 @@ +#!/bin/bash + +echo "$1 $2" >> /etc/hosts diff --git a/tests/gateway/test_aws_secrets_manager.py b/tests/gateway/test_aws_secrets_manager.py new file mode 100644 index 00000000..a8c68456 --- /dev/null +++ b/tests/gateway/test_aws_secrets_manager.py @@ -0,0 +1,112 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +import unittest +from unittest.mock import patch + +from botocore.exceptions import ClientError + +from fbpcp.entity.secret import StringSecret +from fbpcp.error.pcp import PcpError + +from fbpcp.gateway.secrets_manager import AWSSecretsManagerGateway + + +class TestAWSSecretsManagerGateway(unittest.TestCase): + @patch("boto3.client") + def setUp(self, BotoClient) -> None: + self.gw = AWSSecretsManagerGateway("us-west-2") + self.gw.client = BotoClient() + self.secret_id = "test_id" + self.secret_name = "test_secret_name" + self.secret_value = "test_secret_value" + self.tags = {"key1": "value1", "key2": "value2"} + self.aws_tag_list = [ + {"Key": "key1", "Value": "value1"}, + {"Key": "key2", "Value": "value2"}, + ] + self.client_error = ClientError( + error_response={ + "Error": {"Code": 123, "Message": "test"}, + "ResponseMetadata": {}, + }, + operation_name="test_operation", + ) + + def test_create_secret(self): + # Arrange + full_resp = {"ARN": self.secret_id} + self.gw.client.create_secret.return_value = full_resp + + # Act + resp = self.gw.create_secret(self.secret_name, self.secret_value, self.tags) + + # Assert + self.gw.client.create_secret.assert_called_with( + Name=self.secret_name, + SecretString=self.secret_value, + Tags=self.aws_tag_list, + ) + self.assertEqual(self.secret_id, resp) + + def test_create_secret_throw(self): + # Arrange + self.gw.client.create_secret.side_effect = self.client_error + + # Act & Assert + with self.assertRaises(PcpError): + self.gw.create_secret(self.secret_name, self.secret_value) + + def test_get_secret(self): + # Arrange + date = "03-08-2023" + descr_resp = { + "ARN": self.secret_id, + "Name": self.secret_name, + "CreatedDate": date, + "Tags": self.aws_tag_list, + } + val_resp = {"SecretString": self.secret_value} + expected_result = StringSecret( + id=self.secret_id, + name=self.secret_name, + value=self.secret_value, + create_date=date, + tags=self.tags, + ) + self.gw.client.get_secret_value.return_value = val_resp + self.gw.client.describe_secret.return_value = descr_resp + + # Act + resp = self.gw.get_secret(self.secret_id) + + # Assert + self.assertEqual(resp, expected_result) + self.gw.client.get_secret_value.assert_called_with(SecretId=self.secret_id) + self.gw.client.describe_secret(SecretId=self.secret_id) + + def test_get_secret_throw(self): + # Arrange + self.gw.client.get_secret_value.side_effect = self.client_error + + # Act & Assert + with self.assertRaises(PcpError): + self.gw.get_secret(self.secret_id) + + def test_delete_secret(self): + # Act + self.gw.delete_secret(self.secret_id) + + # Assert + self.gw.client.delete_secret.assert_called_with(SecretId=self.secret_id) + + def test_delete_secret_throw(self): + # Arrange + self.gw.client.delete_secret.side_effect = self.client_error + + # Act & Assert + with self.assertRaises(PcpError): + self.gw.delete_secret(self.secret_name, self.secret_value) diff --git a/tests/service/test_secrets_manager_aws.py b/tests/service/test_secrets_manager_aws.py new file mode 100644 index 00000000..d21ab633 --- /dev/null +++ b/tests/service/test_secrets_manager_aws.py @@ -0,0 +1,107 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# +# This source code is licensed under the MIT license found in the +# LICENSE file in the root directory of this source tree. + +import unittest +from unittest import IsolatedAsyncioTestCase +from unittest.mock import patch + +from fbpcp.entity.secret import StringSecret + +from fbpcp.service.secrets_manager_aws import AWSSecretsManagerService + + +class TestAWSSecretsManagerService(unittest.TestCase): + @patch("fbpcp.gateway.secrets_manager.AWSSecretsManagerGateway") + def setUp(self, MockSMGateway): + self.region = "us-west-2" + self.secret_svc = AWSSecretsManagerService(region=self.region) + self.secret_svc.secret_gateway = MockSMGateway() + + def test_create_secret(self): + # Arrange + secret_name = "name" + secret_value = "value" + tags = {"key": "value"} + expected_response = "secret_id" + self.secret_svc.secret_gateway.create_secret.return_value = expected_response + + # Act + resp = self.secret_svc.create_secret(secret_name, secret_value, tags) + + # Assert + self.secret_svc.secret_gateway.create_secret.assert_called_with( + secret_name=secret_name, secret_value=secret_value, tags=tags + ) + self.assertEqual(resp, expected_response) + + def test_get_secret(self): + # Arrange + secret_id = "secret_id" + expected_response = StringSecret( + id=secret_id, name="test", value="test", create_date="03-08-2023" + ) + self.secret_svc.secret_gateway.get_secret.return_value = expected_response + + # Act + resp = self.secret_svc.get_secret(secret_id) + + # Assert + self.assertEqual(resp, expected_response) + self.secret_svc.secret_gateway.get_secret.assert_called_with( + secret_id=secret_id + ) + + def test_delete_secret(self): + # Arrange + secret_id = "secret_id" + + # Act + self.secret_svc.delete_secret(secret_id) + + # Assert + self.secret_svc.secret_gateway.delete_secret.assert_called_with( + secret_id=secret_id + ) + + +class TestAWSSecretsManagerServiceAsync(IsolatedAsyncioTestCase): + @patch("fbpcp.gateway.secrets_manager.AWSSecretsManagerGateway") + def setUp(self, MockSMGateway): + self.region = "us-west-2" + self.secret_svc = AWSSecretsManagerService(region=self.region) + self.secret_svc.secret_gateway = MockSMGateway() + self.secret_name = "name" + self.secret_value = "value" + self.secret_id = "test_id" + self.tags = {"key": "value"} + + @patch("fbpcp.service.secrets_manager_aws.AWSSecretsManagerService.create_secret") + async def test_create_secret_async(self, create_secret_mock): + # Act + await self.secret_svc.create_secret_async( + self.secret_name, self.secret_value, self.tags + ) + + # Assert + create_secret_mock.assert_called_with( + self.secret_name, self.secret_value, self.tags + ) + + @patch("fbpcp.service.secrets_manager_aws.AWSSecretsManagerService.get_secret") + async def test_get_secret_async(self, get_secret_mock): + # Act + await self.secret_svc.get_secret_async(self.secret_id) + + # Assert + get_secret_mock.assert_called_with(self.secret_id) + + @patch("fbpcp.service.secrets_manager_aws.AWSSecretsManagerService.delete_secret") + async def test_delete_secret_async(self, delete_secret_mock): + # Act + await self.secret_svc.delete_secret_async(self.secret_id) + + # Assert + delete_secret_mock.assert_called_with(self.secret_id)