diff --git a/s3tests/functional/test_iam.py b/s3tests/functional/test_iam.py index 4fcb8c0b4..6c41355ab 100644 --- a/s3tests/functional/test_iam.py +++ b/s3tests/functional/test_iam.py @@ -24,6 +24,7 @@ get_alt_iam_client, get_alt_user_id, get_sts_client, + get_sts_user_id, ) from .utils import _get_status, _get_status_and_error_code from .iam import iam_root, iam_alt_root @@ -3004,3 +3005,168 @@ def test_get_account_summary_policy(iam_root): assert 'UsersQuota' in summary assert 'GroupsQuota' in summary assert 'AccessKeysPerUserQuota' in summary + +@pytest.mark.iam_account +def test_bucket_acl_set_owner_with_canonical_and_account_id(iam_root): + """Verify put_bucket_acl accepts AccessControlPolicy.Owner.ID specified as either + the canonical (legacy) user id or the numeric account id. + """ + s3 = get_iam_root_client(service_name='s3') + bucket = get_new_bucket(s3) + + # obtain both forms + canonical_id = 'testacct1root' # from vstart file + main_arn = iam_root.get_user()['User']['Arn'] + account_id = main_arn.removeprefix('arn:aws:iam::').removesuffix(':root') + assert canonical_id != account_id + + # base grants: FULL_CONTROL for owner (using canonical ID) + policy_canonical = { + 'Owner': {'ID': canonical_id}, + 'Grants': [ + { + 'Grantee': {'Type': 'CanonicalUser', 'ID': canonical_id}, + 'Permission': 'FULL_CONTROL' + } + ] + } + s3.put_bucket_acl(Bucket=bucket, AccessControlPolicy=policy_canonical) + resp = s3.get_bucket_acl(Bucket=bucket) + assert resp['Owner']['ID'] == canonical_id + + # now attempt with account id as Owner.ID + policy_account = { + 'Owner': {'ID': account_id}, # variant form + 'Grants': [ + { + 'Grantee': {'Type': 'CanonicalUser', 'ID': account_id}, + 'Permission': 'FULL_CONTROL' + } + ] + } + s3.put_bucket_acl(Bucket=bucket, AccessControlPolicy=policy_account) + resp2 = s3.get_bucket_acl(Bucket=bucket) + # Stored owner id should remain canonical id + assert resp2['Owner']['ID'] == account_id + + +@pytest.mark.iam_account +def test_object_acl_set_owner_with_canonical_and_account_id(iam_root): + """Verify put_object_acl accepts AccessControlPolicy.Owner.ID as canonical user id + and numeric account id. + """ + s3 = get_iam_root_client(service_name='s3') + bucket = get_new_bucket(s3) + canonical_id = 'testacct1root' + main_arn = iam_root.get_user()['User']['Arn'] + account_id = main_arn.removeprefix('arn:aws:iam::').removesuffix(':root') + + s3.put_object(Bucket=bucket, Key='obj', Body='data') + + # set ACL with canonical owner id + acl_canonical = { + 'Owner': {'ID': canonical_id}, + 'Grants': [ + { + 'Grantee': {'Type': 'CanonicalUser', 'ID': account_id}, + 'Permission': 'FULL_CONTROL' + } + ] + } + s3.put_object_acl(Bucket=bucket, Key='obj', AccessControlPolicy=acl_canonical) + r1 = s3.get_object_acl(Bucket=bucket, Key='obj') + assert r1['Owner']['ID'] == canonical_id + + # set ACL with account id as owner id + acl_account = { + 'Owner': {'ID': account_id}, + 'Grants': [ + { + 'Grantee': {'Type': 'CanonicalUser', 'ID': account_id}, + 'Permission': 'FULL_CONTROL' + } + ] + } + s3.put_object_acl(Bucket=bucket, Key='obj', AccessControlPolicy=acl_account) + r2 = s3.get_object_acl(Bucket=bucket, Key='obj') + assert r2['Owner']['ID'] == account_id + +@pytest.mark.iam_account +@pytest.mark.iam_cross_account +@pytest.mark.iam_role +@pytest.mark.role_policy +def test_cross_account_role_assume_cannot_change_acl_owner(iam_root, iam_alt_root): + """Verify that when a user from alt account assumes a role in the main account, + they cannot modify the ACL owner. + The put_bucket_acl should fail with AccessDenied or similar error. + """ + path = get_iam_path_prefix() + user_name = make_iam_name('AltUser') + role_name = make_iam_name('MyRole') + session_name = 'MySession' + s3_main = get_iam_root_client(service_name='s3') + bucket_name = get_new_bucket(s3_main) + + user = iam_alt_root.create_user(UserName=user_name, Path=path)['User'] + user_arn = user['Arn'] + key = iam_alt_root.create_access_key(UserName=user_name)['AccessKey'] + + trust_policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': 'sts:AssumeRole', + 'Principal': {'AWS': user_arn} + }] + }) + # returns MalformedPolicyDocument until the user arn starts working + role = retry_on('MalformedPolicyDocument', 10, iam_root.create_role, + RoleName=role_name, Path=path, AssumeRolePolicyDocument=trust_policy)['Role'] + role_arn = role['Arn'] + + sts = get_sts_client(aws_access_key_id=key['AccessKeyId'], + aws_secret_access_key=key['SecretAccessKey']) + + # returns InvalidClientTokenId or AccessDenied until the access key starts working + response = retry_on(('InvalidClientTokenId', 'AccessDenied'), 10, sts.assume_role, + RoleArn=role_arn, RoleSessionName=session_name) + creds = response['Credentials'] + + s3_assumed = get_iam_s3client(aws_access_key_id = creds['AccessKeyId'], + aws_secret_access_key = creds['SecretAccessKey'], + aws_session_token = creds['SessionToken']) + + policy_name = 'AllowS3AclOperations' + policy = json.dumps({ + 'Version': '2012-10-17', + 'Statement': [{ + 'Effect': 'Allow', + 'Action': ['s3:PutBucketAcl', 's3:GetBucketAcl'], + 'Resource': '*' + }] + }) + iam_root.put_role_policy(RoleName=role_name, PolicyName=policy_name, PolicyDocument=policy) + + alt_canonical_id = get_iam_alt_root_user_id() + modified_acl = { + 'Owner': {'ID': get_sts_user_id()}, + 'Grants': [ + { + 'Grantee': {'Type': 'CanonicalUser', 'ID': alt_canonical_id}, + 'Permission': 'FULL_CONTROL' + } + ] + } + + # Attempting to change owner should fail with AccessDenied + e = assert_raises(ClientError, s3_assumed.put_bucket_acl, + Bucket=bucket_name, AccessControlPolicy=modified_acl) + status, error_code = _get_status_and_error_code(e.response) + assert status == 403 + assert error_code == 'AccessDenied' + + # Attempting to original account id owner should not fail with AccessDenied + main_arn = iam_root.get_user()['User']['Arn'] + account_id = main_arn.removeprefix('arn:aws:iam::').removesuffix(':root') + modified_acl['Owner']['ID'] = account_id # try with actual account id form + retry_on('AccessDenied', 10, s3_assumed.put_bucket_acl, Bucket=bucket_name, AccessControlPolicy=modified_acl)