Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@ MANIFEST
# Environments
.venv
venv/
.idea/
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ Available options:
- `token_duration`: Validity period (in seconds) for retieved authorization tokens.
- `aws_access_key_id`: Use a specific AWS access key to authenticate with AWS.
- `aws_secret_access_key`: Use a specific AWS secret access key to authenticate with AWS.
- `assume_role_arn`: Role ARN to assume (using the configured profile and/or keys) to get the CodeArtifact credentials.
- `assume_role_session_name`: Role session name for the `AssumeRole` call. If not specified, a name identifying the
current user is generated for auditing purposes (e.g. in AWS CloudTrail).

For more explanation of these options see the [AWS CLI documentation](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html).

Expand All @@ -55,6 +58,9 @@ profile_name=default
# Use the following access keys.
aws_access_key_id=xxxxxxxxx
aws_secret_access_key=xxxxxxxxx

# Assume the following role to obtain the credentials.
assume_role_arn=arn:aws:iam::xxxxxxxxx:role/xxxxxxxxx
```

### Multiple Section Configuration (EXPERIMENTAL)
Expand Down
54 changes: 54 additions & 0 deletions keyrings/codeartifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,58 @@ def score(candidate):
return self.config.get(found_key)


def default_role_session_name(sts_client):
# AssumeRole requires a RoleSessionName. Derive a default from the caller's
# STS identity so the assumed session is traceable in AWS CloudTrail.
user_id = sts_client.get_caller_identity()["UserId"]

# RoleSessionName only permits [\w+=,.@-] and is limited to 64 characters.
user_id = re.sub(r"[^\w+=,.@-]", "-", user_id)
return f"keyrings.codeartifact-{user_id}"[:64]


def make_codeartifact_client(options):
# Extract any role to assume; these aren't valid session/client arguments.
assume_role_arn = options.pop("assume_role_arn", None)
assume_role_session_name = options.pop("assume_role_session_name", None)

# Build a session with the provided options.
session = boto3.session.Session(
# NOTE: Only the session accepts 'profile_name'.
profile_name=options.pop("profile_name", None),
region_name=options.get("region_name"),
)

# Optionally assume a role to obtain temporary credentials. The session
# above (using the configured profile and/or static keys) is the source
# identity used to call AssumeRole.
if assume_role_arn:
sts_client = session.client(
"sts",
aws_access_key_id=options.get("aws_access_key_id"),
aws_secret_access_key=options.get("aws_secret_access_key"),
)

assumed_role = sts_client.assume_role(
RoleArn=assume_role_arn,
RoleSessionName=(
assume_role_session_name or default_role_session_name(sts_client)
),
)

# Build a new session from the temporary credentials.
creds = assumed_role["Credentials"]
session = boto3.session.Session(
region_name=options.get("region_name"),
aws_access_key_id=creds["AccessKeyId"],
aws_secret_access_key=creds["SecretAccessKey"],
aws_session_token=creds["SessionToken"],
)

# The temporary credentials replace any static keys for the client.
options.pop("aws_access_key_id", None)
options.pop("aws_secret_access_key", None)

# Create a client for this new session.
return session.client("codeartifact", **options)

Expand Down Expand Up @@ -212,6 +256,16 @@ def get_password(self, service, username):
}
)

# Optionally assume a role to obtain the CodeArtifact credentials.
assume_role_arn = config.get("assume_role_arn")
if assume_role_arn:
options.update({"assume_role_arn": assume_role_arn})

# An optional, explicit role session name for the AssumeRole call.
assume_role_session_name = config.get("assume_role_session_name")
if assume_role_session_name:
options.update({"assume_role_session_name": assume_role_session_name})

# Generate a CodeArtifact client using the callback.
client = self.make_client(options)

Expand Down
135 changes: 134 additions & 1 deletion tests/test_backend.py
Comment thread
elventear marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# test_backend.py -- backend tests

import re

import boto3
import pytest

from io import StringIO
Expand All @@ -12,7 +15,7 @@
from contextlib import contextmanager
from tempfile import NamedTemporaryFile

from keyrings.codeartifact import make_codeartifact_client
from keyrings.codeartifact import default_role_session_name, make_codeartifact_client
from keyrings.codeartifact import CodeArtifactBackend, CodeArtifactKeyringConfig

REGION_NAME = "ca-central-1"
Expand Down Expand Up @@ -151,6 +154,29 @@ def make_client(options):
"verify": "./path/to/certificate.pem",
},
),
# Passing an assume-role ARN and explicit session name through.
(
"""
[codeartifact]
assume_role_arn = arn:aws:iam::000000000000:role/role
assume_role_session_name = SESSION-NAME
""",
{
"assume_role_arn": "arn:aws:iam::000000000000:role/role",
"assume_role_session_name": "SESSION-NAME",
},
),
# A session name is only forwarded alongside a role ARN.
(
"""
[codeartifact]
assume_role_session_name = SESSION-NAME
""",
{
"assume_role_arn": None,
"assume_role_session_name": None,
},
),
],
)
def test_backend_default_options(configuration, assertions):
Expand All @@ -171,3 +197,110 @@ def make_client(options):
backend = CodeArtifactBackend(config=config, make_client=make_client)
url = codeartifact_pypi_url("domain", "000000000000", "region", "name")
credentials = backend.get_credential(url, None)


def test_default_role_session_name():
# The default name is derived from the caller's STS UserId; any characters
# AWS doesn't permit in a RoleSessionName are sanitized away.
sts_client = boto3.session.Session(region_name=REGION_NAME).client(
"sts", region_name=REGION_NAME
)

stubber = Stubber(sts_client)
stubber.add_response(
"get_caller_identity",
{
"UserId": "AROAEXAMPLEID:weird session/name",
"Account": "000000000000",
"Arn": "arn:aws:sts::000000000000:assumed-role/role/weird",
},
{},
)
stubber.activate()

name = default_role_session_name(sts_client)

assert name == "keyrings.codeartifact-AROAEXAMPLEID-weird-session-name"
assert len(name) <= 64
assert re.fullmatch(r"[\w+=,.@-]+", name)
stubber.assert_no_pending_responses()


@pytest.mark.parametrize(
"session_name_options",
[
# An explicitly configured role session name.
{"assume_role_session_name": "SESSION-NAME"},
# No session name: a default identifying the caller is generated.
{},
],
)
def test_make_codeartifact_client_assumes_role(monkeypatch, session_name_options):
role_arn = "arn:aws:iam::000000000000:role/role"

# Build real clients up front so we can attach a stub to STS.
real_session = boto3.session.Session(region_name=REGION_NAME)
sts_client = real_session.client("sts", region_name=REGION_NAME)
codeartifact_client = real_session.client("codeartifact", region_name=REGION_NAME)

sts_stubber = Stubber(sts_client)

explicit_session_name = session_name_options.get("assume_role_session_name")
if explicit_session_name:
expected_session_name = explicit_session_name
else:
# Without a configured name, the default is derived from the caller's
# STS UserId via GetCallerIdentity.
sts_stubber.add_response(
"get_caller_identity",
{
"UserId": "AIDAEXAMPLEUSERID",
"Account": "000000000000",
"Arn": "arn:aws:iam::000000000000:user/example",
},
{},
)
expected_session_name = "keyrings.codeartifact-AIDAEXAMPLEUSERID"

sts_stubber.add_response(
"assume_role",
{
"Credentials": {
"AccessKeyId": "TEMP-ACCESS-KEY-ID",
"SecretAccessKey": "TEMP-SECRET-ACCESS-KEY",
"SessionToken": "TEMP-SESSION-TOKEN",
"Expiration": current_time() + timedelta(hours=1),
},
},
{"RoleArn": role_arn, "RoleSessionName": expected_session_name},
)
sts_stubber.activate()

created_sessions = []

class FakeSession:
def __init__(self, *args, **kwargs):
self.kwargs = kwargs
created_sessions.append(self)

def client(self, service, **kwargs):
return sts_client if service == "sts" else codeartifact_client

# Swap the session factory so we control the clients that get created.
monkeypatch.setattr(boto3.session, "Session", FakeSession)

options = {"region_name": REGION_NAME, "assume_role_arn": role_arn}
options.update(session_name_options)

client = make_codeartifact_client(options)

# The returned client is built from the assumed role's session.
assert client is codeartifact_client
sts_stubber.assert_no_pending_responses()

# The final session was created from the temporary credentials.
assert created_sessions[-1].kwargs["aws_access_key_id"] == "TEMP-ACCESS-KEY-ID"
assert (
created_sessions[-1].kwargs["aws_secret_access_key"] == "TEMP-SECRET-ACCESS-KEY"
)
assert created_sessions[-1].kwargs["aws_session_token"] == "TEMP-SESSION-TOKEN"
25 changes: 8 additions & 17 deletions tests/test_config.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,12 @@
# test_config.py -- config parsing tests

import pytest

from io import StringIO
from os.path import dirname, join
from pathlib import Path

from keyrings.codeartifact import CodeArtifactKeyringConfig


@pytest.fixture
def config_file():
working_directory = dirname(__file__)

def _config_file(path):
return join(working_directory, "config", path)

return _config_file
CONFIG_DIR = Path(__file__).parent / "config"


@pytest.mark.parametrize(
Expand All @@ -26,8 +17,8 @@ def _config_file(path):
("domain", "00000000", "ca-central-1", "repository"),
],
)
def test_parse_single_section_only(config_file, parameters):
config = CodeArtifactKeyringConfig(config_file("single_section.cfg"))
def test_parse_single_section_only(parameters):
config = CodeArtifactKeyringConfig(CONFIG_DIR / "single_section.cfg")

# A single section has only one configuration.
values = config.lookup(*parameters)
Expand Down Expand Up @@ -89,8 +80,8 @@ def test_bogus_config_returns_empty_configuration(config_data):
),
],
)
def test_multiple_sections_with_defaults(config_file, query, expected):
path = config_file("multiple_sections_with_default.cfg")
def test_multiple_sections_with_defaults(query, expected):
path = CONFIG_DIR / "multiple_sections_with_default.cfg"
config = CodeArtifactKeyringConfig(path)
values = config.lookup(**query)

Expand All @@ -115,8 +106,8 @@ def test_multiple_sections_with_defaults(config_file, query, expected):
),
],
)
def test_multiple_sections_no_defaults(config_file, query, expected):
path = config_file("multiple_sections_no_default.cfg")
def test_multiple_sections_no_defaults(query, expected):
path = CONFIG_DIR / "multiple_sections_no_default.cfg"
config = CodeArtifactKeyringConfig(path)
values = config.lookup(**query)

Expand Down