From 5db2b424ac6f575bc7f6c96c597a88d0e5f39927 Mon Sep 17 00:00:00 2001 From: Pepe Barbe Date: Mon, 14 Apr 2025 18:07:01 -0500 Subject: [PATCH] feat: option to assume role to get credentials When fetching credentials after obtaining the boto3 session, we check if the configuration has an optional `assume_role` field, which indicates that role needs to be assumed to obtain the credentials. If found, the role is assumed and used to obtain the credentails. If not the original boto3 session is used to obtain the credentials. --- .gitignore | 1 + README.md | 6 ++ keyrings/codeartifact.py | 54 ++++++++++++++++ tests/test_backend.py | 135 ++++++++++++++++++++++++++++++++++++++- tests/test_config.py | 25 +++----- 5 files changed, 203 insertions(+), 18 deletions(-) diff --git a/.gitignore b/.gitignore index 59da84b..c5d348d 100644 --- a/.gitignore +++ b/.gitignore @@ -27,3 +27,4 @@ MANIFEST # Environments .venv venv/ +.idea/ diff --git a/README.md b/README.md index 15ac016..f644ef3 100644 --- a/README.md +++ b/README.md @@ -37,6 +37,9 @@ Available options: - `token_duration`: Validity period (in seconds) for retieved authorization tokens. - `aws_access_key_id`: Use a specific AWS access key to authenticate with AWS. - `aws_secret_access_key`: Use a specific AWS secret access key to authenticate with AWS. + - `assume_role_arn`: Role ARN to assume (using the configured profile and/or keys) to get the CodeArtifact credentials. + - `assume_role_session_name`: Role session name for the `AssumeRole` call. If not specified, a name identifying the + current user is generated for auditing purposes (e.g. in AWS CloudTrail). For more explanation of these options see the [AWS CLI documentation](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html). @@ -55,6 +58,9 @@ profile_name=default # Use the following access keys. aws_access_key_id=xxxxxxxxx aws_secret_access_key=xxxxxxxxx + +# Assume the following role to obtain the credentials. +assume_role_arn=arn:aws:iam::xxxxxxxxx:role/xxxxxxxxx ``` ### Multiple Section Configuration (EXPERIMENTAL) diff --git a/keyrings/codeartifact.py b/keyrings/codeartifact.py index 907333d..6daa21b 100644 --- a/keyrings/codeartifact.py +++ b/keyrings/codeartifact.py @@ -106,7 +106,21 @@ def score(candidate): return self.config.get(found_key) +def default_role_session_name(sts_client): + # AssumeRole requires a RoleSessionName. Derive a default from the caller's + # STS identity so the assumed session is traceable in AWS CloudTrail. + user_id = sts_client.get_caller_identity()["UserId"] + + # RoleSessionName only permits [\w+=,.@-] and is limited to 64 characters. + user_id = re.sub(r"[^\w+=,.@-]", "-", user_id) + return f"keyrings.codeartifact-{user_id}"[:64] + + def make_codeartifact_client(options): + # Extract any role to assume; these aren't valid session/client arguments. + assume_role_arn = options.pop("assume_role_arn", None) + assume_role_session_name = options.pop("assume_role_session_name", None) + # Build a session with the provided options. session = boto3.session.Session( # NOTE: Only the session accepts 'profile_name'. @@ -114,6 +128,36 @@ def make_codeartifact_client(options): region_name=options.get("region_name"), ) + # Optionally assume a role to obtain temporary credentials. The session + # above (using the configured profile and/or static keys) is the source + # identity used to call AssumeRole. + if assume_role_arn: + sts_client = session.client( + "sts", + aws_access_key_id=options.get("aws_access_key_id"), + aws_secret_access_key=options.get("aws_secret_access_key"), + ) + + assumed_role = sts_client.assume_role( + RoleArn=assume_role_arn, + RoleSessionName=( + assume_role_session_name or default_role_session_name(sts_client) + ), + ) + + # Build a new session from the temporary credentials. + creds = assumed_role["Credentials"] + session = boto3.session.Session( + region_name=options.get("region_name"), + aws_access_key_id=creds["AccessKeyId"], + aws_secret_access_key=creds["SecretAccessKey"], + aws_session_token=creds["SessionToken"], + ) + + # The temporary credentials replace any static keys for the client. + options.pop("aws_access_key_id", None) + options.pop("aws_secret_access_key", None) + # Create a client for this new session. return session.client("codeartifact", **options) @@ -212,6 +256,16 @@ def get_password(self, service, username): } ) + # Optionally assume a role to obtain the CodeArtifact credentials. + assume_role_arn = config.get("assume_role_arn") + if assume_role_arn: + options.update({"assume_role_arn": assume_role_arn}) + + # An optional, explicit role session name for the AssumeRole call. + assume_role_session_name = config.get("assume_role_session_name") + if assume_role_session_name: + options.update({"assume_role_session_name": assume_role_session_name}) + # Generate a CodeArtifact client using the callback. client = self.make_client(options) diff --git a/tests/test_backend.py b/tests/test_backend.py index 549ac1b..1884a68 100644 --- a/tests/test_backend.py +++ b/tests/test_backend.py @@ -1,5 +1,8 @@ # test_backend.py -- backend tests +import re + +import boto3 import pytest from io import StringIO @@ -12,7 +15,7 @@ from contextlib import contextmanager from tempfile import NamedTemporaryFile -from keyrings.codeartifact import make_codeartifact_client +from keyrings.codeartifact import default_role_session_name, make_codeartifact_client from keyrings.codeartifact import CodeArtifactBackend, CodeArtifactKeyringConfig REGION_NAME = "ca-central-1" @@ -151,6 +154,29 @@ def make_client(options): "verify": "./path/to/certificate.pem", }, ), + # Passing an assume-role ARN and explicit session name through. + ( + """ + [codeartifact] + assume_role_arn = arn:aws:iam::000000000000:role/role + assume_role_session_name = SESSION-NAME + """, + { + "assume_role_arn": "arn:aws:iam::000000000000:role/role", + "assume_role_session_name": "SESSION-NAME", + }, + ), + # A session name is only forwarded alongside a role ARN. + ( + """ + [codeartifact] + assume_role_session_name = SESSION-NAME + """, + { + "assume_role_arn": None, + "assume_role_session_name": None, + }, + ), ], ) def test_backend_default_options(configuration, assertions): @@ -171,3 +197,110 @@ def make_client(options): backend = CodeArtifactBackend(config=config, make_client=make_client) url = codeartifact_pypi_url("domain", "000000000000", "region", "name") credentials = backend.get_credential(url, None) + + +def test_default_role_session_name(): + # The default name is derived from the caller's STS UserId; any characters + # AWS doesn't permit in a RoleSessionName are sanitized away. + sts_client = boto3.session.Session(region_name=REGION_NAME).client( + "sts", region_name=REGION_NAME + ) + + stubber = Stubber(sts_client) + stubber.add_response( + "get_caller_identity", + { + "UserId": "AROAEXAMPLEID:weird session/name", + "Account": "000000000000", + "Arn": "arn:aws:sts::000000000000:assumed-role/role/weird", + }, + {}, + ) + stubber.activate() + + name = default_role_session_name(sts_client) + + assert name == "keyrings.codeartifact-AROAEXAMPLEID-weird-session-name" + assert len(name) <= 64 + assert re.fullmatch(r"[\w+=,.@-]+", name) + stubber.assert_no_pending_responses() + + +@pytest.mark.parametrize( + "session_name_options", + [ + # An explicitly configured role session name. + {"assume_role_session_name": "SESSION-NAME"}, + # No session name: a default identifying the caller is generated. + {}, + ], +) +def test_make_codeartifact_client_assumes_role(monkeypatch, session_name_options): + role_arn = "arn:aws:iam::000000000000:role/role" + + # Build real clients up front so we can attach a stub to STS. + real_session = boto3.session.Session(region_name=REGION_NAME) + sts_client = real_session.client("sts", region_name=REGION_NAME) + codeartifact_client = real_session.client("codeartifact", region_name=REGION_NAME) + + sts_stubber = Stubber(sts_client) + + explicit_session_name = session_name_options.get("assume_role_session_name") + if explicit_session_name: + expected_session_name = explicit_session_name + else: + # Without a configured name, the default is derived from the caller's + # STS UserId via GetCallerIdentity. + sts_stubber.add_response( + "get_caller_identity", + { + "UserId": "AIDAEXAMPLEUSERID", + "Account": "000000000000", + "Arn": "arn:aws:iam::000000000000:user/example", + }, + {}, + ) + expected_session_name = "keyrings.codeartifact-AIDAEXAMPLEUSERID" + + sts_stubber.add_response( + "assume_role", + { + "Credentials": { + "AccessKeyId": "TEMP-ACCESS-KEY-ID", + "SecretAccessKey": "TEMP-SECRET-ACCESS-KEY", + "SessionToken": "TEMP-SESSION-TOKEN", + "Expiration": current_time() + timedelta(hours=1), + }, + }, + {"RoleArn": role_arn, "RoleSessionName": expected_session_name}, + ) + sts_stubber.activate() + + created_sessions = [] + + class FakeSession: + def __init__(self, *args, **kwargs): + self.kwargs = kwargs + created_sessions.append(self) + + def client(self, service, **kwargs): + return sts_client if service == "sts" else codeartifact_client + + # Swap the session factory so we control the clients that get created. + monkeypatch.setattr(boto3.session, "Session", FakeSession) + + options = {"region_name": REGION_NAME, "assume_role_arn": role_arn} + options.update(session_name_options) + + client = make_codeartifact_client(options) + + # The returned client is built from the assumed role's session. + assert client is codeartifact_client + sts_stubber.assert_no_pending_responses() + + # The final session was created from the temporary credentials. + assert created_sessions[-1].kwargs["aws_access_key_id"] == "TEMP-ACCESS-KEY-ID" + assert ( + created_sessions[-1].kwargs["aws_secret_access_key"] == "TEMP-SECRET-ACCESS-KEY" + ) + assert created_sessions[-1].kwargs["aws_session_token"] == "TEMP-SESSION-TOKEN" diff --git a/tests/test_config.py b/tests/test_config.py index 211e2da..5bad713 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -1,21 +1,12 @@ # test_config.py -- config parsing tests - import pytest from io import StringIO -from os.path import dirname, join +from pathlib import Path from keyrings.codeartifact import CodeArtifactKeyringConfig - -@pytest.fixture -def config_file(): - working_directory = dirname(__file__) - - def _config_file(path): - return join(working_directory, "config", path) - - return _config_file +CONFIG_DIR = Path(__file__).parent / "config" @pytest.mark.parametrize( @@ -26,8 +17,8 @@ def _config_file(path): ("domain", "00000000", "ca-central-1", "repository"), ], ) -def test_parse_single_section_only(config_file, parameters): - config = CodeArtifactKeyringConfig(config_file("single_section.cfg")) +def test_parse_single_section_only(parameters): + config = CodeArtifactKeyringConfig(CONFIG_DIR / "single_section.cfg") # A single section has only one configuration. values = config.lookup(*parameters) @@ -89,8 +80,8 @@ def test_bogus_config_returns_empty_configuration(config_data): ), ], ) -def test_multiple_sections_with_defaults(config_file, query, expected): - path = config_file("multiple_sections_with_default.cfg") +def test_multiple_sections_with_defaults(query, expected): + path = CONFIG_DIR / "multiple_sections_with_default.cfg" config = CodeArtifactKeyringConfig(path) values = config.lookup(**query) @@ -115,8 +106,8 @@ def test_multiple_sections_with_defaults(config_file, query, expected): ), ], ) -def test_multiple_sections_no_defaults(config_file, query, expected): - path = config_file("multiple_sections_no_default.cfg") +def test_multiple_sections_no_defaults(query, expected): + path = CONFIG_DIR / "multiple_sections_no_default.cfg" config = CodeArtifactKeyringConfig(path) values = config.lookup(**query)