Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
__pycache__/
*.py[cod]
*$py.class
.pytest_cache/

# Distribution / packaging
.Python
Expand Down
30 changes: 27 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,15 @@ Run `keyring diagnose` to find its as the location; it varies between different
Available options:

- `profile_name`: Use a specific AWS profile to authenticate with AWS.
- `token_duration`: Validity period (in seconds) for retieved authorization tokens.
- `token_duration`: Validity period (in seconds) for retrieved authorization tokens.
- `aws_access_key_id`: Use a specific AWS access key to authenticate with AWS.
- `aws_secret_access_key`: Use a specific AWS secret access key to authenticate with AWS.

For more explanation of these options see the [AWS CLI documentation](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html).

### Single Section Configuration
### Single Section Configuration - boto3

A trivial example `keyringrc.cfg` section for a single account:
A trivial example `keyringrc.cfg` section for a single account that uses the `boto3` client:

```ini
[codeartifact]
Expand All @@ -57,6 +57,30 @@ aws_access_key_id=xxxxxxxxx
aws_secret_access_key=xxxxxxxxx
```

### Single Section Configuration - teleport

A trivial example `keyringrc.cfg` section for a single account that uses the `tsh` client.
Requires the [Teleport](https://goteleport.com/) client to be installed and configured.

```ini
[codeartifact]
# Use the tsh binary to create the ca token.
# Can be overridden by the CA_KEYRING_CLIENT environment variable.
default_client = tsh

# Tokens should only be valid for 30 minutes.
token_duration=1800

# teleport proxy to use for authentication.
teleport_proxy = foo.teleport.sh

# name of the teleport application to use for authentication.
tsh_app_name = test_app_name

# name of the teleport role to use for authentication.
tsh_aws_role_name = test_aws_role_name
```

### Multiple Section Configuration (EXPERIMENTAL)

This backend can use multiple sections to select different configuration values.
Expand Down
Empty file added keyrings/clients/__init__.py
Empty file.
89 changes: 89 additions & 0 deletions keyrings/clients/boto3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import boto3
import boto3.session

import os
from datetime import datetime

import logging

logging.getLogger("keyrings.codeartifact")


class Boto3CAClient:
def __init__(
self,
region: str = None,
domain: str = None,
account: str = None,
profile_name: str = None,
aws_access_key_id: str = None,
aws_secret_access_key: str = None,
session=None,
token_duration: int = 3600,
):
"""
Initialize the boto3 codeartifact client.

:param region: AWS region to use.
:param domain: CodeArtifact domain name.
:param account: AWS account ID.
:param profile_name: AWS profile name (optional).
"""
if region:
self.region = region
else:
self.region = os.getenv("AWS_REGION")
self.domain = domain
self.account = account
self.profile_name = profile_name
self.aws_access_key_id = aws_access_key_id
self.aws_secret_access_key = aws_secret_access_key
self.token_duration = token_duration
if session:
self.session = session
else:
self.session = boto3.session.Session()

def _get_codeartifact_client(self):
# CodeArtifact requires a region.
kwargs = {"region_name": self.region}

# If a profile name was provided, use it.
if self.profile_name:
kwargs.update({"profile_name": self.profile_name})

# If static access/secret keys were provided, use them.
if self.aws_access_key_id and self.aws_secret_access_key:
kwargs.update(
{
"aws_access_key_id": self.aws_access_key_id,
"aws_secret_access_key": self.aws_secret_access_key,
}
)

# Build a CodeArtifact client from the session.
return self.session.client("codeartifact", **kwargs)

def get_authorization_token(self):
"""
Get the CodeArtifact authorization token.

:return: Authorization token as a string.
"""
client = self._get_codeartifact_client()
response = client.get_authorization_token(
domain=self.domain,
domainOwner=self.account,
durationSeconds=self.token_duration,
)

# Figure out our local timezone from the current time.
tzinfo = datetime.now().astimezone().tzinfo
now = datetime.now(tz=tzinfo)

# Give up if the token has already expired.
if response.get("expiration", now) <= now:
logging.warning("Received an expired CodeArtifact token!")
return

return response.get("authorizationToken")
132 changes: 132 additions & 0 deletions keyrings/clients/tsh.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import subprocess
import re
import os

import logging

logging.getLogger("keyrings.codeartifact")


class TeleportCAClient:
def __init__(
self,
account: str,
domain: str,
region: str,
teleport_proxy: str = None,
tsh_app_name: str = None,
tsh_aws_role_name: str = None,
**kwargs,
):
"""
Initialize the teleport codeartifact client.
This class is responsible for managing the Teleport login and app authentication
to retrieve the CodeArtifact authorization token.
"""
self.region = region
self.domain = domain
self.account = account
self.tsh_aws_role_name = tsh_aws_role_name
self.tsh_app_name = tsh_app_name
self.teleport_proxy = teleport_proxy

def _get_teleport_path_status(self):
"""Check if the tsh command is available in the system path."""
try:
subprocess.run(
["tsh", "version"],
capture_output=True,
text=True,
check=True,
)
return True
except subprocess.CalledProcessError as e:
raise RuntimeError(
"Error checking tsh command. Confirm it is installed and available via PATH: %s"
% e.stderr.strip()
) from e

def _get_teleport_login_status(self):
"""Check if the user is logged into Teleport."""
try:
subprocess.run(
["tsh", "status"],
capture_output=True,
text=True,
check=True,
)
return True
except subprocess.CalledProcessError as e:
logging.debug("Error checking Teleport login status: %s", e.stderr.strip())
return False

def _teleport_login(self):
"""Login to Teleport using the tsh command."""
try:
login_command = f"tsh login --proxy={self.teleport_proxy}"
subprocess.run(
login_command.split(" "),
capture_output=True,
text=True,
check=True,
)
return True
except subprocess.CalledProcessError as e:
logging.debug("Error logging into Teleport: %s", e.stderr.strip())
return False

def _get_teleport_app_auth_status(self):
"""Check if the user is authenticated for the Teleport app."""
try:
subprocess.run(
["tsh", "app", "config", f"{self.tsh_app_name}"],
capture_output=True,
text=True,
check=True,
)
return True
except subprocess.CalledProcessError as e:
logging.debug(
"Error checking Teleport app auth status: %s", e.stderr.strip()
)
return False

def _teleport_app_login(self):
"""Login to the Teleport app using the tsh command."""
try:
login_command = (
f"tsh app login {self.tsh_app_name} --aws-role {self.tsh_aws_role_name}"
)
subprocess.run(
login_command.split(" "),
capture_output=True,
text=True,
check=True,
)
return True
except subprocess.CalledProcessError as e:
logging.debug("Error logging into Teleport app: %s", e.stderr.strip())
return False

def _get_ca_token(self):
"""use tsh prefixed aws cli command to generate codeartifact token"""
try:
get_ca_token_command = f"tsh aws codeartifact get-authorization-token --domain {self.domain} --domain-owner {self.account} --query authorizationToken --region {self.region} --output text"
get_ca_token_command_list = get_ca_token_command.split(" ")
tsh_output = subprocess.run(
get_ca_token_command_list,
capture_output=True,
text=True,
check=True,
)
return tsh_output.stdout.strip()
except subprocess.CalledProcessError as e:
logging.debug("Error getting CA token: %s", e.stderr.strip())

def get_authorization_token(self):
"""Get the CodeArtifact authorization token."""
if not self._get_teleport_login_status():
self._teleport_login()
if not self._get_teleport_app_auth_status():
self._teleport_app_login()
return self._get_ca_token()
Loading