diff --git a/changes/9742.test.md b/changes/9742.test.md new file mode 100644 index 00000000000..7a7938b4893 --- /dev/null +++ b/changes/9742.test.md @@ -0,0 +1 @@ +Expand `check_permission_with_scope_chain()` test coverage with 15 additional edge-case scenarios diff --git a/tests/unit/manager/repositories/permission_controller/test_check_permission_with_scope_chain.py b/tests/unit/manager/repositories/permission_controller/test_check_permission_with_scope_chain.py index ef6e1522722..100f88e8247 100644 --- a/tests/unit/manager/repositories/permission_controller/test_check_permission_with_scope_chain.py +++ b/tests/unit/manager/repositories/permission_controller/test_check_permission_with_scope_chain.py @@ -1,6 +1,6 @@ """ Tests for PermissionDBSource.check_permission_with_scope_chain(). -Covers CTE-based scope chain traversal with AUTO/REF edge semantics and GLOBAL fallback. +Covers CTE-based scope chain traversal with AUTO/REF edge semantics. """ from __future__ import annotations @@ -36,6 +36,15 @@ from ai.backend.testutils.db import with_tables +@dataclass +class PermissionEntry: + """A single permission to create in permission_setup fixture.""" + + scope_key: str + operation: OperationType + entity_type: EntityType = EntityType.VFOLDER + + @dataclass class ScopeChainFixture: """Pre-built fixture data for scope chain tests.""" @@ -45,6 +54,7 @@ class ScopeChainFixture: domain_id: str = field(default_factory=lambda: str(uuid.uuid4())) project_id: str = field(default_factory=lambda: str(uuid.uuid4())) vfolder_id: str = field(default_factory=lambda: str(uuid.uuid4())) + user_scope_id: str = field(default_factory=lambda: str(uuid.uuid4())) class TestCheckPermissionWithScopeChain: @@ -171,16 +181,19 @@ async def permission_setup( "vfolder": (ScopeType.VFOLDER, fixture_ids.vfolder_id), "project": (ScopeType.PROJECT, fixture_ids.project_id), "domain": (ScopeType.DOMAIN, fixture_ids.domain_id), + "user_scope": (ScopeType.USER, fixture_ids.user_scope_id), } - for scope_key, operation in request.param: - scope_type, scope_id = scope_map[scope_key] + for entry in request.param: + if not isinstance(entry, PermissionEntry): + raise TypeError(f"Expected PermissionEntry, got {type(entry).__name__}: {entry!r}") + scope_type, scope_id = scope_map[entry.scope_key] async with db_with_rbac_tables.begin_session() as db_sess: perm = PermissionRow( role_id=fixture_ids.role_id, scope_type=scope_type, scope_id=scope_id, - entity_type=EntityType.VFOLDER, - operation=operation, + entity_type=entry.entity_type, + operation=entry.operation, ) db_sess.add(perm) await db_sess.flush() @@ -190,7 +203,7 @@ async def permission_setup( [ pytest.param([], OperationType.READ, False, id="no-permission"), pytest.param( - [("project", OperationType.READ)], + [PermissionEntry("project", OperationType.READ)], OperationType.READ, True, id="direct-scope-read", @@ -223,7 +236,7 @@ async def test_vfolder_auto_in_project( ("permission_setup", "check_op", "expected"), [ pytest.param( - [("domain", OperationType.UPDATE)], + [PermissionEntry("domain", OperationType.UPDATE)], OperationType.UPDATE, True, id="parent-scope-update", @@ -257,13 +270,13 @@ async def test_vfolder_auto_chain_to_domain( ("permission_setup", "check_op", "expected"), [ pytest.param( - [("project", OperationType.READ)], + [PermissionEntry("project", OperationType.READ)], OperationType.READ, False, id="ref-blocks-read", ), pytest.param( - [("project", OperationType.UPDATE)], + [PermissionEntry("project", OperationType.UPDATE)], OperationType.UPDATE, False, id="ref-blocks-update", @@ -296,7 +309,7 @@ async def test_vfolder_ref_in_project( ("permission_setup", "check_op", "expected"), [ pytest.param( - [("domain", OperationType.READ)], + [PermissionEntry("domain", OperationType.READ)], OperationType.READ, False, id="ref-stops-chain", @@ -354,7 +367,7 @@ async def user_with_inactive_role( @pytest.mark.parametrize( ("permission_setup",), [ - pytest.param([("project", OperationType.READ)], id="inactive-role-read"), + pytest.param([PermissionEntry("project", OperationType.READ)], id="inactive-role-read"), ], indirect=["permission_setup"], ) @@ -381,13 +394,13 @@ async def test_inactive_role_denied( ("permission_setup", "check_op", "expected"), [ pytest.param( - [("vfolder", OperationType.READ)], + [PermissionEntry("vfolder", OperationType.READ)], OperationType.READ, True, id="self-scope-read", ), pytest.param( - [("vfolder", OperationType.UPDATE)], + [PermissionEntry("vfolder", OperationType.UPDATE)], OperationType.UPDATE, True, id="self-scope-update", @@ -420,7 +433,7 @@ async def test_self_scope_permission( ("permission_setup", "check_op", "expected"), [ pytest.param( - [("vfolder", OperationType.READ)], + [PermissionEntry("vfolder", OperationType.READ)], OperationType.READ, True, id="self-scope-no-assoc", @@ -447,3 +460,596 @@ async def test_self_scope_without_any_association( operation=check_op, ) assert result is expected + + # ── A. Operation mismatch ── + + @pytest.mark.parametrize( + ("permission_setup", "check_op", "expected"), + [ + pytest.param( + [PermissionEntry("project", OperationType.READ)], + OperationType.UPDATE, + False, + id="project-read-perm-check-update", + ), + ], + indirect=["permission_setup"], + ) + async def test_operation_mismatch_direct_scope( + self, + db_source: PermissionDBSource, + user_with_active_role: ScopeChainFixture, + vfolder_in_project_auto: None, + permission_setup: None, + check_op: OperationType, + expected: bool, + ) -> None: + """Permission exists but for a different operation; check should fail.""" + f = user_with_active_role + result = await db_source.check_permission_with_scope_chain( + user_id=f.user_id, + target_element_ref=RBACElementRef( + element_type=RBACElementType.VFOLDER, + element_id=f.vfolder_id, + ), + operation=check_op, + ) + assert result is expected + + @pytest.mark.parametrize( + ("permission_setup", "check_op", "expected"), + [ + pytest.param( + [PermissionEntry("domain", OperationType.CREATE)], + OperationType.SOFT_DELETE, + False, + id="domain-create-perm-check-soft-delete", + ), + ], + indirect=["permission_setup"], + ) + async def test_operation_mismatch_chain_scope( + self, + db_source: PermissionDBSource, + user_with_active_role: ScopeChainFixture, + vfolder_in_project_auto: None, + project_in_domain_auto: None, + permission_setup: None, + check_op: OperationType, + expected: bool, + ) -> None: + """Operation mismatch at chain-traversed scope still returns False.""" + f = user_with_active_role + result = await db_source.check_permission_with_scope_chain( + user_id=f.user_id, + target_element_ref=RBACElementRef( + element_type=RBACElementType.VFOLDER, + element_id=f.vfolder_id, + ), + operation=check_op, + ) + assert result is expected + + # ── B. DELETED role ── + + @pytest.fixture + async def user_with_deleted_role( + self, + db_with_rbac_tables: ExtendedAsyncSAEngine, + fixture_ids: ScopeChainFixture, + ) -> ScopeChainFixture: + """Create a user with a deleted role.""" + async with db_with_rbac_tables.begin_session() as db_sess: + role = RoleRow( + id=fixture_ids.role_id, + name="deleted-role", + status=RoleStatus.DELETED, + ) + db_sess.add(role) + await db_sess.flush() + + user_role = UserRoleRow( + user_id=fixture_ids.user_id, + role_id=fixture_ids.role_id, + ) + db_sess.add(user_role) + await db_sess.flush() + + return fixture_ids + + @pytest.mark.parametrize( + ("permission_setup",), + [ + pytest.param([PermissionEntry("project", OperationType.READ)], id="deleted-role-read"), + ], + indirect=["permission_setup"], + ) + async def test_deleted_role_denied( + self, + db_source: PermissionDBSource, + user_with_deleted_role: ScopeChainFixture, + vfolder_in_project_auto: None, + permission_setup: None, + ) -> None: + """Deleted role does not grant any permission.""" + f = user_with_deleted_role + result = await db_source.check_permission_with_scope_chain( + user_id=f.user_id, + target_element_ref=RBACElementRef( + element_type=RBACElementType.VFOLDER, + element_id=f.vfolder_id, + ), + operation=OperationType.READ, + ) + assert result is False + + # ── C. No role assigned ── + + @pytest.fixture + async def user_with_unassigned_role( + self, + db_with_rbac_tables: ExtendedAsyncSAEngine, + fixture_ids: ScopeChainFixture, + ) -> ScopeChainFixture: + """Create a role with permission but do NOT assign it to the user.""" + async with db_with_rbac_tables.begin_session() as db_sess: + role = RoleRow( + id=fixture_ids.role_id, + name="unassigned-role", + ) + db_sess.add(role) + await db_sess.flush() + + perm = PermissionRow( + role_id=fixture_ids.role_id, + scope_type=ScopeType.PROJECT, + scope_id=fixture_ids.project_id, + entity_type=EntityType.VFOLDER, + operation=OperationType.READ, + ) + db_sess.add(perm) + await db_sess.flush() + + return fixture_ids + + async def test_no_role_assigned( + self, + db_source: PermissionDBSource, + user_with_unassigned_role: ScopeChainFixture, + vfolder_in_project_auto: None, + ) -> None: + """User with no role assignment gets no permission.""" + f = user_with_unassigned_role + result = await db_source.check_permission_with_scope_chain( + user_id=f.user_id, + target_element_ref=RBACElementRef( + element_type=RBACElementType.VFOLDER, + element_id=f.vfolder_id, + ), + operation=OperationType.READ, + ) + assert result is False + + # ── D. Multiple roles ── + + @pytest.fixture + async def user_with_two_roles( + self, + db_with_rbac_tables: ExtendedAsyncSAEngine, + fixture_ids: ScopeChainFixture, + ) -> tuple[ScopeChainFixture, uuid.UUID]: + """Create a user with two active roles. Returns (fixture, second_role_id).""" + second_role_id = uuid.uuid4() + async with db_with_rbac_tables.begin_session() as db_sess: + role1 = RoleRow( + id=fixture_ids.role_id, + name="role-1", + ) + role2 = RoleRow( + id=second_role_id, + name="role-2", + ) + db_sess.add_all([role1, role2]) + await db_sess.flush() + + ur1 = UserRoleRow( + user_id=fixture_ids.user_id, + role_id=fixture_ids.role_id, + ) + ur2 = UserRoleRow( + user_id=fixture_ids.user_id, + role_id=second_role_id, + ) + db_sess.add_all([ur1, ur2]) + await db_sess.flush() + + return fixture_ids, second_role_id + + @pytest.fixture + async def multi_role_permission_setup( + self, + db_with_rbac_tables: ExtendedAsyncSAEngine, + fixture_ids: ScopeChainFixture, + user_with_two_roles: tuple[ScopeChainFixture, uuid.UUID], + request: pytest.FixtureRequest, + ) -> None: + """Set up permissions for multi-role tests. Each entry: (role_key, scope_key, operation).""" + _, second_role_id = user_with_two_roles + role_map: dict[str, uuid.UUID] = { + "first": fixture_ids.role_id, + "second": second_role_id, + } + scope_map: dict[str, tuple[ScopeType, str]] = { + "project": (ScopeType.PROJECT, fixture_ids.project_id), + "domain": (ScopeType.DOMAIN, fixture_ids.domain_id), + } + for role_key, scope_key, operation in request.param: + role_id = role_map[role_key] + scope_type, scope_id = scope_map[scope_key] + async with db_with_rbac_tables.begin_session() as db_sess: + perm = PermissionRow( + role_id=role_id, + scope_type=scope_type, + scope_id=scope_id, + entity_type=EntityType.VFOLDER, + operation=operation, + ) + db_sess.add(perm) + await db_sess.flush() + + @pytest.mark.parametrize( + ("multi_role_permission_setup", "check_op", "expected"), + [ + pytest.param( + [("second", "project", OperationType.READ)], + OperationType.READ, + True, + id="one-role-has-read", + ), + pytest.param( + [ + ("first", "project", OperationType.READ), + ("second", "project", OperationType.READ), + ], + OperationType.UPDATE, + False, + id="both-roles-read-check-update", + ), + ], + indirect=["multi_role_permission_setup"], + ) + async def test_multiple_roles( + self, + db_source: PermissionDBSource, + user_with_two_roles: tuple[ScopeChainFixture, uuid.UUID], + vfolder_in_project_auto: None, + multi_role_permission_setup: None, + check_op: OperationType, + expected: bool, + ) -> None: + """Multiple roles: succeeds if any role matches, fails if none does.""" + f, _ = user_with_two_roles + result = await db_source.check_permission_with_scope_chain( + user_id=f.user_id, + target_element_ref=RBACElementRef( + element_type=RBACElementType.VFOLDER, + element_id=f.vfolder_id, + ), + operation=check_op, + ) + assert result is expected + + # ── E. Mixed edge chain (AUTO→REF, REF→AUTO) ── + + @pytest.fixture + async def project_in_domain_ref( + self, + db_with_rbac_tables: ExtendedAsyncSAEngine, + fixture_ids: ScopeChainFixture, + ) -> None: + """PROJECT referenced by DOMAIN (ref edge).""" + async with db_with_rbac_tables.begin_session() as db_sess: + assoc = AssociationScopesEntitiesRow( + scope_type=ScopeType.DOMAIN, + scope_id=fixture_ids.domain_id, + entity_type=EntityType.PROJECT, + entity_id=fixture_ids.project_id, + relation_type=RelationType.REF, + ) + db_sess.add(assoc) + await db_sess.flush() + + @pytest.mark.parametrize( + ("permission_setup", "check_op", "expected"), + [ + pytest.param( + [PermissionEntry("domain", OperationType.READ)], + OperationType.READ, + False, + id="auto-then-ref-blocks-domain", + ), + ], + indirect=["permission_setup"], + ) + async def test_auto_then_ref_blocks_chain( + self, + db_source: PermissionDBSource, + user_with_active_role: ScopeChainFixture, + vfolder_in_project_auto: None, + project_in_domain_ref: None, + permission_setup: None, + check_op: OperationType, + expected: bool, + ) -> None: + """VFOLDER→(AUTO)→PROJECT→(REF)→DOMAIN: REF in the middle blocks chain.""" + f = user_with_active_role + result = await db_source.check_permission_with_scope_chain( + user_id=f.user_id, + target_element_ref=RBACElementRef( + element_type=RBACElementType.VFOLDER, + element_id=f.vfolder_id, + ), + operation=check_op, + ) + assert result is expected + + @pytest.mark.parametrize( + ("permission_setup", "check_op", "expected"), + [ + pytest.param( + [PermissionEntry("project", OperationType.READ)], + OperationType.READ, + True, + id="auto-segment-before-ref-valid", + ), + ], + indirect=["permission_setup"], + ) + async def test_auto_segment_before_ref_valid( + self, + db_source: PermissionDBSource, + user_with_active_role: ScopeChainFixture, + vfolder_in_project_auto: None, + project_in_domain_ref: None, + permission_setup: None, + check_op: OperationType, + expected: bool, + ) -> None: + """VFOLDER→(AUTO)→PROJECT→(REF)→DOMAIN: PROJECT scope is still reachable.""" + f = user_with_active_role + result = await db_source.check_permission_with_scope_chain( + user_id=f.user_id, + target_element_ref=RBACElementRef( + element_type=RBACElementType.VFOLDER, + element_id=f.vfolder_id, + ), + operation=check_op, + ) + assert result is expected + + # ── F. Deep chain (3-level AUTO) ── + + @pytest.fixture + async def domain_in_user_scope_auto( + self, + db_with_rbac_tables: ExtendedAsyncSAEngine, + fixture_ids: ScopeChainFixture, + ) -> None: + """DOMAIN belongs to USER scope (auto edge) — 3rd level.""" + async with db_with_rbac_tables.begin_session() as db_sess: + assoc = AssociationScopesEntitiesRow( + scope_type=ScopeType.USER, + scope_id=fixture_ids.user_scope_id, + entity_type=EntityType.DOMAIN, + entity_id=fixture_ids.domain_id, + relation_type=RelationType.AUTO, + ) + db_sess.add(assoc) + await db_sess.flush() + + @pytest.mark.parametrize( + ("permission_setup", "check_op", "expected"), + [ + pytest.param( + [PermissionEntry("user_scope", OperationType.READ)], + OperationType.READ, + True, + id="three-level-auto-chain", + ), + ], + indirect=["permission_setup"], + ) + async def test_deep_three_level_auto_chain( + self, + db_source: PermissionDBSource, + user_with_active_role: ScopeChainFixture, + vfolder_in_project_auto: None, + project_in_domain_auto: None, + domain_in_user_scope_auto: None, + permission_setup: None, + check_op: OperationType, + expected: bool, + ) -> None: + """VFOLDER→(AUTO)→PROJECT→(AUTO)→DOMAIN→(AUTO)→USER: 3-level chain traversal.""" + f = user_with_active_role + result = await db_source.check_permission_with_scope_chain( + user_id=f.user_id, + target_element_ref=RBACElementRef( + element_type=RBACElementType.VFOLDER, + element_id=f.vfolder_id, + ), + operation=check_op, + ) + assert result is expected + + # ── G. Self-scope + AUTO edge combination ── + + @pytest.mark.parametrize( + ("permission_setup", "check_op", "expected"), + [ + pytest.param( + [PermissionEntry("vfolder", OperationType.READ)], + OperationType.READ, + True, + id="self-scope-with-auto-edge", + ), + ], + indirect=["permission_setup"], + ) + async def test_self_scope_with_auto_edge( + self, + db_source: PermissionDBSource, + user_with_active_role: ScopeChainFixture, + vfolder_in_project_auto: None, + permission_setup: None, + check_op: OperationType, + expected: bool, + ) -> None: + """Self-scope permission works regardless of AUTO edge presence.""" + f = user_with_active_role + result = await db_source.check_permission_with_scope_chain( + user_id=f.user_id, + target_element_ref=RBACElementRef( + element_type=RBACElementType.VFOLDER, + element_id=f.vfolder_id, + ), + operation=check_op, + ) + assert result is expected + + # ── H. User isolation ── + + @pytest.fixture + async def other_user_with_permission( + self, + db_with_rbac_tables: ExtendedAsyncSAEngine, + fixture_ids: ScopeChainFixture, + ) -> None: + """Create another user with a role and READ permission on the same project.""" + other_user_id = uuid.uuid4() + other_role_id = uuid.uuid4() + + async with db_with_rbac_tables.begin_session() as db_sess: + role = RoleRow( + id=other_role_id, + name="other-user-role", + ) + db_sess.add(role) + await db_sess.flush() + + user_role = UserRoleRow( + user_id=other_user_id, + role_id=other_role_id, + ) + db_sess.add(user_role) + await db_sess.flush() + + perm = PermissionRow( + role_id=other_role_id, + scope_type=ScopeType.PROJECT, + scope_id=fixture_ids.project_id, + entity_type=EntityType.VFOLDER, + operation=OperationType.READ, + ) + db_sess.add(perm) + await db_sess.flush() + + async def test_other_user_permission_isolation( + self, + db_source: PermissionDBSource, + fixture_ids: ScopeChainFixture, + vfolder_in_project_auto: None, + other_user_with_permission: None, + ) -> None: + """Permission granted to another user does not affect the target user.""" + result = await db_source.check_permission_with_scope_chain( + user_id=fixture_ids.user_id, + target_element_ref=RBACElementRef( + element_type=RBACElementType.VFOLDER, + element_id=fixture_ids.vfolder_id, + ), + operation=OperationType.READ, + ) + assert result is False + + # ── I. Multiple permissions on same role ── + + @pytest.mark.parametrize( + ("permission_setup", "check_op", "expected"), + [ + pytest.param( + [ + PermissionEntry("project", OperationType.READ), + PermissionEntry("project", OperationType.UPDATE), + ], + OperationType.READ, + True, + id="multi-perm-read-matches", + ), + pytest.param( + [ + PermissionEntry("project", OperationType.READ), + PermissionEntry("project", OperationType.UPDATE), + ], + OperationType.SOFT_DELETE, + False, + id="multi-perm-soft-delete-no-match", + ), + ], + indirect=["permission_setup"], + ) + async def test_multiple_permissions_on_same_role( + self, + db_source: PermissionDBSource, + user_with_active_role: ScopeChainFixture, + vfolder_in_project_auto: None, + permission_setup: None, + check_op: OperationType, + expected: bool, + ) -> None: + """Role with READ+UPDATE: READ matches, SOFT_DELETE does not.""" + f = user_with_active_role + result = await db_source.check_permission_with_scope_chain( + user_id=f.user_id, + target_element_ref=RBACElementRef( + element_type=RBACElementType.VFOLDER, + element_id=f.vfolder_id, + ), + operation=check_op, + ) + assert result is expected + + # ── J. Different entity_type permission ── + + @pytest.mark.parametrize( + ("permission_setup", "check_op", "expected"), + [ + pytest.param( + [PermissionEntry("project", OperationType.READ, EntityType.SESSION)], + OperationType.READ, + False, + id="session-perm-vfolder-check", + ), + ], + indirect=["permission_setup"], + ) + async def test_entity_type_mismatch( + self, + db_source: PermissionDBSource, + user_with_active_role: ScopeChainFixture, + vfolder_in_project_auto: None, + permission_setup: None, + check_op: OperationType, + expected: bool, + ) -> None: + """SESSION entity permission does not match VFOLDER entity check.""" + f = user_with_active_role + result = await db_source.check_permission_with_scope_chain( + user_id=f.user_id, + target_element_ref=RBACElementRef( + element_type=RBACElementType.VFOLDER, + element_id=f.vfolder_id, + ), + operation=check_op, + ) + assert result is expected