From 24ca1ac7b03e11d62c05c7093a72d9f53f11d042 Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Tue, 28 Oct 2025 13:14:56 -0400 Subject: [PATCH 1/2] sts: test tenant role permissions against bucket acls without any matching identity policy, test that an assumed role inherits acl-based permissions on the assuming user Signed-off-by: Casey Bodley --- s3tests/functional/test_sts.py | 95 ++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/s3tests/functional/test_sts.py b/s3tests/functional/test_sts.py index 06de13e32..21ca70ae5 100644 --- a/s3tests/functional/test_sts.py +++ b/s3tests/functional/test_sts.py @@ -37,9 +37,12 @@ get_iam_path_prefix, make_iam_name, get_client, + get_main_user_id, + get_alt_client, get_alt_user_id, get_config_endpoint, get_new_bucket_name, + get_new_bucket, get_parameter_name, get_main_aws_access_key, get_main_aws_secret_key, @@ -55,6 +58,7 @@ get_azp, get_user_token ) +from .utils import (assert_raises, _get_status_and_error_code) log = logging.getLogger(__name__) @@ -394,6 +398,97 @@ def test_assume_role_allow_head_nonexistent(): status = e.response['ResponseMetadata']['HTTPStatusCode'] assert status == 404 +@pytest.mark.test_of_sts +@pytest.mark.fails_on_dbstore +def test_assume_role_owner_allow(): + iam_client=get_iam_client() + sts_client=get_sts_client() + sts_user_id=get_alt_user_id() + default_endpoint=get_config_endpoint() + role_name=get_parameter_name() + role_session_name=get_parameter_name() + + policy_document = '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"AWS":["arn:aws:iam:::user/'+sts_user_id+'"]},"Action":["sts:AssumeRole"]}]}' + role_response = iam_client.create_role(Path='/', RoleName=role_name, AssumeRolePolicyDocument=policy_document) + + resp = sts_client.assume_role(RoleArn=role_response['Role']['Arn'], RoleSessionName=role_session_name) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='') + + # create a bucket with the alt user + bucket_name = get_new_bucket(get_alt_client()) + + # access allowed from role assumed by alt user + s3_client.get_bucket_location(Bucket=bucket_name) + +@pytest.mark.test_of_sts +@pytest.mark.fails_on_dbstore +def test_assume_role_nonowner_deny(): + iam_client=get_iam_client() + sts_client=get_sts_client() + sts_user_id=get_alt_user_id() + default_endpoint=get_config_endpoint() + role_name=get_parameter_name() + role_session_name=get_parameter_name() + + policy_document = '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"AWS":["arn:aws:iam:::user/'+sts_user_id+'"]},"Action":["sts:AssumeRole"]}]}' + role_response = iam_client.create_role(Path='/', RoleName=role_name, AssumeRolePolicyDocument=policy_document) + + resp = sts_client.assume_role(RoleArn=role_response['Role']['Arn'], RoleSessionName=role_session_name) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='') + + # create a bucket with the main user + main_client = get_client() + bucket_name = get_new_bucket(main_client) + + # access denied from role assumed by alt user + e = assert_raises(ClientError, s3_client.get_bucket_location, Bucket=bucket_name) + assert (403, 'AccessDenied') == _get_status_and_error_code(e.response) + +@pytest.mark.test_of_sts +@pytest.mark.fails_on_dbstore +def test_assume_role_acl_allow(): + iam_client=get_iam_client() + sts_client=get_sts_client() + main_user_id=get_main_user_id() + sts_user_id=get_alt_user_id() + default_endpoint=get_config_endpoint() + role_name=get_parameter_name() + role_session_name=get_parameter_name() + + policy_document = '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"AWS":["arn:aws:iam:::user/'+sts_user_id+'"]},"Action":["sts:AssumeRole"]}]}' + role_response = iam_client.create_role(Path='/', RoleName=role_name, AssumeRolePolicyDocument=policy_document) + + resp = sts_client.assume_role(RoleArn=role_response['Role']['Arn'], RoleSessionName=role_session_name) + + s3_client = boto3.client('s3', + aws_access_key_id = resp['Credentials']['AccessKeyId'], + aws_secret_access_key = resp['Credentials']['SecretAccessKey'], + aws_session_token = resp['Credentials']['SessionToken'], + endpoint_url=default_endpoint, + region_name='') + + # create a bucket with the main user and grant read acl to alt user + main_client = get_client() + bucket_name = get_new_bucket(main_client) + main_client.put_bucket_acl(Bucket=bucket_name, + GrantFullControl=f'id={main_user_id}', + GrantReadACP=f'id={sts_user_id}') + + # access allowed from role assumed by alt user + s3_client.get_bucket_location(Bucket=bucket_name) + @pytest.mark.webidentity_test @pytest.mark.token_claims_trust_policy_test From 3a6b20b25ff5462f9ae9a95154a378065ce14d5b Mon Sep 17 00:00:00 2001 From: Casey Bodley Date: Thu, 30 Oct 2025 11:21:04 -0400 Subject: [PATCH 2/2] sts: session policy tests bucket ops to avoid object acls when testing interactions between bucket/role/session policy, avoid PutObject because it creates object acls that grant GetObject permission replace PutObject with ListBucket, and GetObject with GetBucketLocation to test the same policy interactions Signed-off-by: Casey Bodley --- s3tests/functional/test_sts.py | 29 ++++++++++++----------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/s3tests/functional/test_sts.py b/s3tests/functional/test_sts.py index 21ca70ae5..0e31a2656 100644 --- a/s3tests/functional/test_sts.py +++ b/s3tests/functional/test_sts.py @@ -1060,7 +1060,7 @@ def test_session_policy_bucket_policy_role_arn(): "Statement": [{ "Effect": "Allow", "Principal": {"AWS": "{}".format(rolearn)}, - "Action": ["s3:GetObject","s3:PutObject"], + "Action": ["s3:GetBucketLocation","s3:ListBucket"], "Resource": [ "{}".format(resource1), "{}".format(resource2) @@ -1068,7 +1068,7 @@ def test_session_policy_bucket_policy_role_arn(): }] }) s3client_iamcreds.put_bucket_policy(Bucket=bucket_name_1, Policy=bucket_policy) - session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" + session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:ListBucket\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 @@ -1080,15 +1080,12 @@ def test_session_policy_bucket_policy_role_arn(): endpoint_url=default_endpoint, region_name='', ) - bucket_body = 'this is a test file' - s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") - assert s3_put_obj['ResponseMetadata']['HTTPStatusCode'] == 200 - try: - obj = s3_client.get_object(Bucket=bucket_name_1, Key="test-1.txt") - except ClientError as e: - s3object_error = e.response.get("Error", {}).get("Code") - assert s3object_error == 'AccessDenied' + s3_client.head_bucket(Bucket=bucket_name_1) + + # s3:GetBucketLocation not allowed by session policy + e = assert_raises(ClientError, s3_client.get_bucket_location, Bucket=bucket_name_1) + assert (403, 'AccessDenied') == _get_status_and_error_code(e.response) oidc_remove=iam_client.delete_open_id_connect_provider( OpenIDConnectProviderArn=oidc_arn @@ -1136,7 +1133,7 @@ def test_session_policy_bucket_policy_session_arn(): "Statement": [{ "Effect": "Allow", "Principal": {"AWS": "{}".format(rolesessionarn)}, - "Action": ["s3:GetObject","s3:PutObject"], + "Action": ["s3:GetBucketLocation","s3:ListBucket"], "Resource": [ "{}".format(resource1), "{}".format(resource2) @@ -1144,7 +1141,7 @@ def test_session_policy_bucket_policy_session_arn(): }] }) s3client_iamcreds.put_bucket_policy(Bucket=bucket_name_1, Policy=bucket_policy) - session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:PutObject\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" + session_policy = "{\"Version\":\"2012-10-17\",\"Statement\":{\"Effect\":\"Allow\",\"Action\":[\"s3:ListBucket\"],\"Resource\":[\"arn:aws:s3:::test1\",\"arn:aws:s3:::test1/*\"]}}" resp=sts_client.assume_role_with_web_identity(RoleArn=role_response['Role']['Arn'],RoleSessionName=role_session_name,WebIdentityToken=token,Policy=session_policy) assert resp['ResponseMetadata']['HTTPStatusCode'] == 200 @@ -1156,13 +1153,11 @@ def test_session_policy_bucket_policy_session_arn(): endpoint_url=default_endpoint, region_name='', ) - bucket_body = 'this is a test file' - s3_put_obj = s3_client.put_object(Body=bucket_body, Bucket=bucket_name_1, Key="test-1.txt") - assert s3_put_obj['ResponseMetadata']['HTTPStatusCode'] == 200 + s3_client.head_bucket(Bucket=bucket_name_1) - s3_get_obj = s3_client.get_object(Bucket=bucket_name_1, Key="test-1.txt") - assert s3_get_obj['ResponseMetadata']['HTTPStatusCode'] == 200 + # s3:GetBucketLocation allowed by bucket policy + s3_client.get_bucket_location(Bucket=bucket_name_1) oidc_remove=iam_client.delete_open_id_connect_provider( OpenIDConnectProviderArn=oidc_arn