diff --git a/.gitignore b/.gitignore index 59da84b..c5d348d 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,4 @@ MANIFEST # Environments .venv venv/ +.idea/ diff --git a/README.md b/README.md index 15ac016..f644ef3 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,9 @@ Available options: - `token_duration`: Validity period (in seconds) for retieved authorization tokens. - `aws_access_key_id`: Use a specific AWS access key to authenticate with AWS. - `aws_secret_access_key`: Use a specific AWS secret access key to authenticate with AWS. + - `assume_role_arn`: Role ARN to assume (using the configured profile and/or keys) to get the CodeArtifact credentials. + - `assume_role_session_name`: Role session name for the `AssumeRole` call. If not specified, a name identifying the + current user is generated for auditing purposes (e.g. in AWS CloudTrail). For more explanation of these options see the [AWS CLI documentation](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html). @@ -55,6 +58,9 @@ profile_name=default # Use the following access keys. aws_access_key_id=xxxxxxxxx aws_secret_access_key=xxxxxxxxx + +# Assume the following role to obtain the credentials. +assume_role_arn=arn:aws:iam::xxxxxxxxx:role/xxxxxxxxx ``` ### Multiple Section Configuration (EXPERIMENTAL) diff --git a/keyrings/codeartifact.py b/keyrings/codeartifact.py index 907333d..6daa21b 100644 --- a/keyrings/codeartifact.py +++ b/keyrings/codeartifact.py @@ -106,7 +106,21 @@ def score(candidate): return self.config.get(found_key) +def default_role_session_name(sts_client): + # AssumeRole requires a RoleSessionName. Derive a default from the caller's + # STS identity so the assumed session is traceable in AWS CloudTrail. + user_id = sts_client.get_caller_identity()["UserId"] + + # RoleSessionName only permits [\w+=,.@-] and is limited to 64 characters. + user_id = re.sub(r"[^\w+=,.@-]", "-", user_id) + return f"keyrings.codeartifact-{user_id}"[:64] + + def make_codeartifact_client(options): + # Extract any role to assume; these aren't valid session/client arguments. + assume_role_arn = options.pop("assume_role_arn", None) + assume_role_session_name = options.pop("assume_role_session_name", None) + # Build a session with the provided options. session = boto3.session.Session( # NOTE: Only the session accepts 'profile_name'. @@ -114,6 +128,36 @@ def make_codeartifact_client(options): region_name=options.get("region_name"), ) + # Optionally assume a role to obtain temporary credentials. The session + # above (using the configured profile and/or static keys) is the source + # identity used to call AssumeRole. + if assume_role_arn: + sts_client = session.client( + "sts", + aws_access_key_id=options.get("aws_access_key_id"), + aws_secret_access_key=options.get("aws_secret_access_key"), + ) + + assumed_role = sts_client.assume_role( + RoleArn=assume_role_arn, + RoleSessionName=( + assume_role_session_name or default_role_session_name(sts_client) + ), + ) + + # Build a new session from the temporary credentials. + creds = assumed_role["Credentials"] + session = boto3.session.Session( + region_name=options.get("region_name"), + aws_access_key_id=creds["AccessKeyId"], + aws_secret_access_key=creds["SecretAccessKey"], + aws_session_token=creds["SessionToken"], + ) + + # The temporary credentials replace any static keys for the client. + options.pop("aws_access_key_id", None) + options.pop("aws_secret_access_key", None) + # Create a client for this new session. return session.client("codeartifact", **options) @@ -212,6 +256,16 @@ def get_password(self, service, username): } ) + # Optionally assume a role to obtain the CodeArtifact credentials. + assume_role_arn = config.get("assume_role_arn") + if assume_role_arn: + options.update({"assume_role_arn": assume_role_arn}) + + # An optional, explicit role session name for the AssumeRole call. + assume_role_session_name = config.get("assume_role_session_name") + if assume_role_session_name: + options.update({"assume_role_session_name": assume_role_session_name}) + # Generate a CodeArtifact client using the callback. client = self.make_client(options) diff --git a/tests/test_backend.py b/tests/test_backend.py index 549ac1b..1884a68 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -1,5 +1,8 @@ # test_backend.py -- backend tests +import re + +import boto3 import pytest from io import StringIO @@ -12,7 +15,7 @@ from contextlib import contextmanager from tempfile import NamedTemporaryFile -from keyrings.codeartifact import make_codeartifact_client +from keyrings.codeartifact import default_role_session_name, make_codeartifact_client from keyrings.codeartifact import CodeArtifactBackend, CodeArtifactKeyringConfig REGION_NAME = "ca-central-1" @@ -151,6 +154,29 @@ def make_client(options): "verify": "./path/to/certificate.pem", }, ), + # Passing an assume-role ARN and explicit session name through. + ( + """ + [codeartifact] + assume_role_arn = arn:aws:iam::000000000000:role/role + assume_role_session_name = SESSION-NAME + """, + { + "assume_role_arn": "arn:aws:iam::000000000000:role/role", + "assume_role_session_name": "SESSION-NAME", + }, + ), + # A session name is only forwarded alongside a role ARN. + ( + """ + [codeartifact] + assume_role_session_name = SESSION-NAME + """, + { + "assume_role_arn": None, + "assume_role_session_name": None, + }, + ), ], ) def test_backend_default_options(configuration, assertions): @@ -171,3 +197,110 @@ def make_client(options): backend = CodeArtifactBackend(config=config, make_client=make_client) url = codeartifact_pypi_url("domain", "000000000000", "region", "name") credentials = backend.get_credential(url, None) + + +def test_default_role_session_name(): + # The default name is derived from the caller's STS UserId; any characters + # AWS doesn't permit in a RoleSessionName are sanitized away. + sts_client = boto3.session.Session(region_name=REGION_NAME).client( + "sts", region_name=REGION_NAME + ) + + stubber = Stubber(sts_client) + stubber.add_response( + "get_caller_identity", + { + "UserId": "AROAEXAMPLEID:weird session/name", + "Account": "000000000000", + "Arn": "arn:aws:sts::000000000000:assumed-role/role/weird", + }, + {}, + ) + stubber.activate() + + name = default_role_session_name(sts_client) + + assert name == "keyrings.codeartifact-AROAEXAMPLEID-weird-session-name" + assert len(name) <= 64 + assert re.fullmatch(r"[\w+=,.@-]+", name) + stubber.assert_no_pending_responses() + + +@pytest.mark.parametrize( + "session_name_options", + [ + # An explicitly configured role session name. + {"assume_role_session_name": "SESSION-NAME"}, + # No session name: a default identifying the caller is generated. + {}, + ], +) +def test_make_codeartifact_client_assumes_role(monkeypatch, session_name_options): + role_arn = "arn:aws:iam::000000000000:role/role" + + # Build real clients up front so we can attach a stub to STS. + real_session = boto3.session.Session(region_name=REGION_NAME) + sts_client = real_session.client("sts", region_name=REGION_NAME) + codeartifact_client = real_session.client("codeartifact", region_name=REGION_NAME) + + sts_stubber = Stubber(sts_client) + + explicit_session_name = session_name_options.get("assume_role_session_name") + if explicit_session_name: + expected_session_name = explicit_session_name + else: + # Without a configured name, the default is derived from the caller's + # STS UserId via GetCallerIdentity. + sts_stubber.add_response( + "get_caller_identity", + { + "UserId": "AIDAEXAMPLEUSERID", + "Account": "000000000000", + "Arn": "arn:aws:iam::000000000000:user/example", + }, + {}, + ) + expected_session_name = "keyrings.codeartifact-AIDAEXAMPLEUSERID" + + sts_stubber.add_response( + "assume_role", + { + "Credentials": { + "AccessKeyId": "TEMP-ACCESS-KEY-ID", + "SecretAccessKey": "TEMP-SECRET-ACCESS-KEY", + "SessionToken": "TEMP-SESSION-TOKEN", + "Expiration": current_time() + timedelta(hours=1), + }, + }, + {"RoleArn": role_arn, "RoleSessionName": expected_session_name}, + ) + sts_stubber.activate() + + created_sessions = [] + + class FakeSession: + def __init__(self, *args, **kwargs): + self.kwargs = kwargs + created_sessions.append(self) + + def client(self, service, **kwargs): + return sts_client if service == "sts" else codeartifact_client + + # Swap the session factory so we control the clients that get created. + monkeypatch.setattr(boto3.session, "Session", FakeSession) + + options = {"region_name": REGION_NAME, "assume_role_arn": role_arn} + options.update(session_name_options) + + client = make_codeartifact_client(options) + + # The returned client is built from the assumed role's session. + assert client is codeartifact_client + sts_stubber.assert_no_pending_responses() + + # The final session was created from the temporary credentials. + assert created_sessions[-1].kwargs["aws_access_key_id"] == "TEMP-ACCESS-KEY-ID" + assert ( + created_sessions[-1].kwargs["aws_secret_access_key"] == "TEMP-SECRET-ACCESS-KEY" + ) + assert created_sessions[-1].kwargs["aws_session_token"] == "TEMP-SESSION-TOKEN" diff --git a/tests/test_config.py b/tests/test_config.py index 211e2da..5bad713 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,21 +1,12 @@ # test_config.py -- config parsing tests - import pytest from io import StringIO -from os.path import dirname, join +from pathlib import Path from keyrings.codeartifact import CodeArtifactKeyringConfig - -@pytest.fixture -def config_file(): - working_directory = dirname(__file__) - - def _config_file(path): - return join(working_directory, "config", path) - - return _config_file +CONFIG_DIR = Path(__file__).parent / "config" @pytest.mark.parametrize( @@ -26,8 +17,8 @@ def _config_file(path): ("domain", "00000000", "ca-central-1", "repository"), ], ) -def test_parse_single_section_only(config_file, parameters): - config = CodeArtifactKeyringConfig(config_file("single_section.cfg")) +def test_parse_single_section_only(parameters): + config = CodeArtifactKeyringConfig(CONFIG_DIR / "single_section.cfg") # A single section has only one configuration. values = config.lookup(*parameters) @@ -89,8 +80,8 @@ def test_bogus_config_returns_empty_configuration(config_data): ), ], ) -def test_multiple_sections_with_defaults(config_file, query, expected): - path = config_file("multiple_sections_with_default.cfg") +def test_multiple_sections_with_defaults(query, expected): + path = CONFIG_DIR / "multiple_sections_with_default.cfg" config = CodeArtifactKeyringConfig(path) values = config.lookup(**query) @@ -115,8 +106,8 @@ def test_multiple_sections_with_defaults(config_file, query, expected): ), ], ) -def test_multiple_sections_no_defaults(config_file, query, expected): - path = config_file("multiple_sections_no_default.cfg") +def test_multiple_sections_no_defaults(query, expected): + path = CONFIG_DIR / "multiple_sections_no_default.cfg" config = CodeArtifactKeyringConfig(path) values = config.lookup(**query)