diff --git a/tests/common/constants.py b/tests/common/constants.py index 5b4450f4d065..f53d4822824a 100644 --- a/tests/common/constants.py +++ b/tests/common/constants.py @@ -1,5 +1,7 @@ # SPDX-License-Identifier: Apache-2.0 +import jwt + # Pytest Fixture Constants REMOTE_ADDR = "1.2.3.4" @@ -182,3 +184,44 @@ "CJraWQiOiJmMTMzOGNhMjY4MzU4NjNmNjcxNDAzOTQxNzM4YTdiNDllNzQwZmMwIiwidH" "lwIjoiSldUIn0.wlPNSE6eTFvznJawgpa6cHC3a8sU5_VBH8si9h-sgi0" ) + +""" + { + "iss": "https://example-org.semaphoreci.com", + "aud": "pypi", + "jti": "test-jti", + "org": "example-org", + "org_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890", + "prj": "example-project", + "prj_id": "b2c3d4e5-f6a7-8901-bcde-f01234567891", + "repo": "myrepo", + "repo_slug": "owner/myrepo", + "sub": "org:example-org:project:uuid:repo:myrepo:ref_type:branch:ref:main", + "ref": "main", + "ref_type": "branch", + "iat": 1650663865, + "nbf": 1650663265, + "exp": 1650664165 + } +""" +DUMMY_SEMAPHORE_OIDC_JWT = jwt.encode( + { + "iss": "https://example-org.semaphoreci.com", + "aud": "pypi", + "jti": "test-jti", + "org": "example-org", + "org_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890", + "prj": "example-project", + "prj_id": "b2c3d4e5-f6a7-8901-bcde-f01234567891", + "repo": "myrepo", + "repo_slug": "owner/myrepo", + "sub": "org:example-org:project:uuid:repo:myrepo:ref_type:branch:ref:main", + "ref": "main", + "ref_type": "branch", + "iat": 1650663865, + "nbf": 1650663265, + "exp": 1650664165, + }, + "secret", + algorithm="HS256", +) diff --git a/tests/common/db/oidc.py b/tests/common/db/oidc.py index f81202ab926d..1510718aa3ba 100644 --- a/tests/common/db/oidc.py +++ b/tests/common/db/oidc.py @@ -12,6 +12,8 @@ PendingGitHubPublisher, PendingGitLabPublisher, PendingGooglePublisher, + PendingSemaphorePublisher, + SemaphorePublisher, ) from .accounts import UserFactory @@ -122,3 +124,33 @@ class Meta: actor = factory.Faker("pystr", max_chars=12) actor_id = factory.Faker("uuid4") added_by = factory.SubFactory(UserFactory) + + +class SemaphorePublisherFactory(WarehouseFactory): + class Meta: + model = SemaphorePublisher + + id = factory.Faker("uuid4", cast_to=None) + organization = factory.Faker("pystr", max_chars=12) + semaphore_organization_id = factory.Faker("uuid4") + project = factory.Faker("pystr", max_chars=12) + semaphore_project_id = factory.Faker("uuid4") + repo_slug = factory.LazyAttribute( + lambda obj: f"{obj.organization}/{obj.project}-repo" + ) + + +class PendingSemaphorePublisherFactory(WarehouseFactory): + class Meta: + model = PendingSemaphorePublisher + + id = factory.Faker("uuid4", cast_to=None) + project_name = factory.Faker("pystr", max_chars=12) + organization = factory.Faker("pystr", max_chars=12) + semaphore_organization_id = factory.Faker("uuid4") + project = factory.Faker("pystr", max_chars=12) + semaphore_project_id = factory.Faker("uuid4") + repo_slug = factory.LazyAttribute( + lambda obj: f"{obj.organization}/{obj.project}-repo" + ) + added_by = factory.SubFactory(UserFactory) diff --git a/tests/unit/oidc/forms/test_semaphore.py b/tests/unit/oidc/forms/test_semaphore.py new file mode 100644 index 000000000000..de7e7c9b99c5 --- /dev/null +++ b/tests/unit/oidc/forms/test_semaphore.py @@ -0,0 +1,262 @@ +# SPDX-License-Identifier: Apache-2.0 + +import pretend + +from webob.multidict import MultiDict + +from warehouse.oidc import forms + + +class TestPendingSemaphorePublisherForm: + def test_validate(self, pyramid_request): + form = forms.PendingSemaphorePublisherForm( + MultiDict( + { + "organization": "example-org", + "semaphore_organization_id": "org-id-1234", + "project": "example-project", + "semaphore_project_id": "proj-id-5678", + "repo_slug": "owner/repo", + "project_name": "example-pypi-project", + } + ), + route_url=pyramid_request.route_url, + check_project_name=lambda name: True, + user=pretend.stub(), + ) + + assert form.validate() + + def test_validate_organization_required(self, pyramid_request): + form = forms.PendingSemaphorePublisherForm( + MultiDict( + { + "project": "example-project", + "repo_slug": "owner/repo", + "project_name": "example-pypi-project", + } + ), + route_url=pyramid_request.route_url, + check_project_name=lambda name: True, + user=pretend.stub(), + ) + + assert not form.validate() + assert "organization" in form.errors + + def test_validate_project_required(self, pyramid_request): + form = forms.PendingSemaphorePublisherForm( + MultiDict( + { + "organization": "example-org", + "repo_slug": "owner/repo", + "project_name": "example-pypi-project", + } + ), + route_url=pyramid_request.route_url, + check_project_name=lambda name: True, + user=pretend.stub(), + ) + + assert not form.validate() + assert "project" in form.errors + + def test_validate_repo_slug_required(self, pyramid_request): + form = forms.PendingSemaphorePublisherForm( + MultiDict( + { + "organization": "example-org", + "project": "example-project", + "project_name": "example-pypi-project", + } + ), + route_url=pyramid_request.route_url, + check_project_name=lambda name: True, + user=pretend.stub(), + ) + + assert not form.validate() + assert "repo_slug" in form.errors + + def test_validate_repo_slug_format(self, pyramid_request): + form = forms.PendingSemaphorePublisherForm( + MultiDict( + { + "organization": "example-org", + "project": "example-project", + "repo_slug": "invalid-format", + "project_name": "example-pypi-project", + } + ), + route_url=pyramid_request.route_url, + check_project_name=lambda name: True, + user=pretend.stub(), + ) + + assert not form.validate() + assert "repo_slug" in form.errors + + def test_validate_organization_format(self, pyramid_request): + form = forms.PendingSemaphorePublisherForm( + MultiDict( + { + "organization": "invalid org!", + "project": "example-project", + "repo_slug": "owner/repo", + "project_name": "example-pypi-project", + } + ), + route_url=pyramid_request.route_url, + check_project_name=lambda name: True, + user=pretend.stub(), + ) + + assert not form.validate() + assert "organization" in form.errors + + def test_validate_organization_id_required(self, pyramid_request): + form = forms.PendingSemaphorePublisherForm( + MultiDict( + { + "organization": "example-org", + "project": "example-project", + "semaphore_project_id": "proj-id-5678", + "repo_slug": "owner/repo", + "project_name": "example-pypi-project", + } + ), + route_url=pyramid_request.route_url, + check_project_name=lambda name: True, + user=pretend.stub(), + ) + + assert not form.validate() + assert "semaphore_organization_id" in form.errors + + def test_validate_project_id_required(self, pyramid_request): + form = forms.PendingSemaphorePublisherForm( + MultiDict( + { + "organization": "example-org", + "semaphore_organization_id": "org-id-1234", + "project": "example-project", + "repo_slug": "owner/repo", + "project_name": "example-pypi-project", + } + ), + route_url=pyramid_request.route_url, + check_project_name=lambda name: True, + user=pretend.stub(), + ) + + assert not form.validate() + assert "semaphore_project_id" in form.errors + + def test_provider_property(self, pyramid_request): + form = forms.PendingSemaphorePublisherForm( + data={ + "organization": "example-org", + "organization_id": "org-id-1234", + "project": "example-project", + "project_id": "proj-id-5678", + "repo_slug": "owner/repo", + "project_name": "example-pypi-project", + }, + route_url=pyramid_request.route_url, + check_project_name=lambda name: True, + user=pretend.stub(), + ) + + assert form.provider == "semaphore" + + +class TestSemaphorePublisherForm: + def test_validate(self): + form = forms.SemaphorePublisherForm( + MultiDict( + { + "organization": "example-org", + "semaphore_organization_id": "org-id-1234", + "project": "example-project", + "semaphore_project_id": "proj-id-5678", + "repo_slug": "owner/repo", + } + ) + ) + + assert form.validate() + + def test_validate_organization_required(self): + form = forms.SemaphorePublisherForm( + MultiDict( + { + "semaphore_organization_id": "org-id-1234", + "project": "example-project", + "semaphore_project_id": "proj-id-5678", + "repo_slug": "owner/repo", + } + ) + ) + + assert not form.validate() + assert "organization" in form.errors + + def test_validate_project_required(self): + form = forms.SemaphorePublisherForm( + MultiDict( + { + "organization": "example-org", + "semaphore_organization_id": "org-id-1234", + "semaphore_project_id": "proj-id-5678", + "repo_slug": "owner/repo", + } + ) + ) + + assert not form.validate() + assert "project" in form.errors + + def test_validate_repo_slug_required(self): + form = forms.SemaphorePublisherForm( + MultiDict( + { + "organization": "example-org", + "semaphore_organization_id": "org-id-1234", + "project": "example-project", + "semaphore_project_id": "proj-id-5678", + } + ) + ) + + assert not form.validate() + assert "repo_slug" in form.errors + + def test_validate_organization_id_required(self): + form = forms.SemaphorePublisherForm( + MultiDict( + { + "organization": "example-org", + "project": "example-project", + "semaphore_project_id": "proj-id-5678", + "repo_slug": "owner/repo", + } + ) + ) + + assert not form.validate() + assert "semaphore_organization_id" in form.errors + + def test_validate_project_id_required(self): + form = forms.SemaphorePublisherForm( + MultiDict( + { + "organization": "example-org", + "semaphore_organization_id": "org-id-1234", + "project": "example-project", + "repo_slug": "owner/repo", + } + ) + ) + + assert not form.validate() + assert "semaphore_project_id" in form.errors diff --git a/tests/unit/oidc/models/test_semaphore.py b/tests/unit/oidc/models/test_semaphore.py new file mode 100644 index 000000000000..708f274a2c27 --- /dev/null +++ b/tests/unit/oidc/models/test_semaphore.py @@ -0,0 +1,526 @@ +# SPDX-License-Identifier: Apache-2.0 + +import pytest + +from tests.common.db.accounts import UserFactory +from warehouse.oidc import errors +from warehouse.oidc.models import semaphore + + +class TestSemaphorePublisher: + def test_publisher_name(self): + publisher = semaphore.SemaphorePublisher( + organization="example-org", + semaphore_organization_id="org-id-1234", + project="example-project", + semaphore_project_id="proj-id-5678", + repo_slug="owner/repo", + ) + + assert publisher.publisher_name == "SemaphoreCI" + + def test_publisher_base_url(self): + publisher = semaphore.SemaphorePublisher( + organization="example-org", + semaphore_organization_id="org-id-1234", + project="example-project", + semaphore_project_id="proj-id-5678", + repo_slug="owner/repo", + ) + + assert publisher.publisher_base_url is None + + def test_publisher_url(self): + publisher = semaphore.SemaphorePublisher( + organization="example-org", + semaphore_organization_id="org-id-1234", + project="example-project", + semaphore_project_id="proj-id-5678", + repo_slug="owner/repo", + ) + + assert publisher.publisher_url() is None + assert publisher.publisher_url({"ref": "refs/heads/main"}) is None + + def test_stored_claims(self): + publisher = semaphore.SemaphorePublisher( + organization="example-org", + semaphore_organization_id="org-id-1234", + project="example-project", + semaphore_project_id="proj-id-5678", + repo_slug="owner/repo", + ) + + assert publisher.stored_claims() == {"ref": None, "ref_type": None} + assert publisher.stored_claims( + {"ref": "refs/heads/main", "ref_type": "branch"} + ) == { + "ref": "refs/heads/main", + "ref_type": "branch", + } + + def test_sub(self): + publisher = semaphore.SemaphorePublisher( + organization="example-org", + semaphore_organization_id="org-id-1234", + project="example-project", + semaphore_project_id="proj-id-5678", + repo_slug="owner/repo", + ) + + assert publisher.sub == "repo_slug:owner/repo" + + def test_repo_slug(self): + publisher = semaphore.SemaphorePublisher( + organization="example-org", + semaphore_organization_id="org-id-1234", + project="example-project", + semaphore_project_id="proj-id-5678", + repo_slug="owner/repo", + ) + + assert publisher.repo_slug == "owner/repo" + + def test_repo(self): + publisher = semaphore.SemaphorePublisher( + organization="example-org", + semaphore_organization_id="org-id-1234", + project="example-project", + semaphore_project_id="proj-id-5678", + repo_slug="owner/repo", + ) + + assert publisher.repo == "repo" + + def test_org(self): + publisher = semaphore.SemaphorePublisher( + organization="example-org", + semaphore_organization_id="org-id-1234", + project="example-project", + semaphore_project_id="proj-id-5678", + repo_slug="owner/repo", + ) + + assert publisher.org == "example-org" + + def test_prj(self): + publisher = semaphore.SemaphorePublisher( + organization="example-org", + semaphore_organization_id="org-id-1234", + project="example-project", + semaphore_project_id="proj-id-5678", + repo_slug="owner/repo", + ) + + assert publisher.prj == "example-project" + + def test_org_id(self): + publisher = semaphore.SemaphorePublisher( + organization="example-org", + semaphore_organization_id="org-id-1234", + project="example-project", + semaphore_project_id="proj-id-5678", + repo_slug="owner/repo", + ) + + assert publisher.org_id == "org-id-1234" + + def test_prj_id(self): + publisher = semaphore.SemaphorePublisher( + organization="example-org", + semaphore_organization_id="org-id-1234", + project="example-project", + semaphore_project_id="proj-id-5678", + repo_slug="owner/repo", + ) + + assert publisher.prj_id == "proj-id-5678" + + def test_stringifies_as_repository(self): + publisher = semaphore.SemaphorePublisher( + organization="example-org", + semaphore_organization_id="org-id-1234", + project="example-project", + semaphore_project_id="proj-id-5678", + repo_slug="owner/repo", + ) + + assert str(publisher) == "owner/repo" + + def test_semaphore_publisher_admin_details(self): + publisher = semaphore.SemaphorePublisher( + organization="example-org", + semaphore_organization_id="org-id-1234", + project="example-project", + semaphore_project_id="proj-id-5678", + repo_slug="owner/repo", + ) + + assert publisher.admin_details == [ + ("Organization", "example-org"), + ("Organization ID", "org-id-1234"), + ("Project", "example-project"), + ("Project ID", "proj-id-5678"), + ("Repository", "owner/repo"), + ] + + def test_semaphore_publisher_all_known_claims(self): + assert semaphore.SemaphorePublisher.all_known_claims() == { + # verifiable claims + "sub", + "org", + "org_id", + "prj", + "prj_id", + "repo_slug", + "jti", + # preverified claims + "iss", + "iat", + "nbf", + "exp", + "aud", + # unchecked claims + "repo", + "wf_id", + "ppl_id", + "job_id", + "branch", + "pr_branch", + "pr", + "ref", + "ref_type", + "tag", + "job_type", + "trg", + "sub127", + } + + def test_semaphore_publisher_lookup_by_claims(self, db_request): + publisher = semaphore.SemaphorePublisher( + organization="example-org", + semaphore_organization_id="org-id-1234", + project="example-project", + semaphore_project_id="proj-id-5678", + repo_slug="owner/repo", + ) + db_request.db.add(publisher) + db_request.db.flush() + + signed_claims = { + "org": "example-org", + "org_id": "org-id-1234", + "prj": "example-project", + "prj_id": "proj-id-5678", + "repo_slug": "owner/repo", + } + + found_publisher = semaphore.SemaphorePublisher.lookup_by_claims( + db_request.db, signed_claims + ) + + assert found_publisher == publisher + + def test_semaphore_publisher_lookup_by_claims_not_found(self, db_request): + signed_claims = { + "org": "example-org", + "org_id": "org-id-1234", + "prj": "nonexistent-project", + "prj_id": "proj-id-9999", + "repo_slug": "owner/repo", + } + + with pytest.raises(errors.InvalidPublisherError) as exc: + semaphore.SemaphorePublisher.lookup_by_claims(db_request.db, signed_claims) + + assert str(exc.value) == "Publisher with matching claims was not found" + + def test_semaphore_publisher_lookup_missing_org(self, db_request): + signed_claims = { + "org_id": "org-id-1234", + "prj": "example-project", + "prj_id": "proj-id-5678", + "repo_slug": "owner/repo", + } + + with pytest.raises(errors.InvalidPublisherError) as exc: + semaphore.SemaphorePublisher.lookup_by_claims(db_request.db, signed_claims) + + assert "Missing required claims" in str(exc.value) + + def test_semaphore_publisher_lookup_missing_prj(self, db_request): + signed_claims = { + "org": "example-org", + "org_id": "org-id-1234", + "prj_id": "proj-id-5678", + "repo_slug": "owner/repo", + } + + with pytest.raises(errors.InvalidPublisherError) as exc: + semaphore.SemaphorePublisher.lookup_by_claims(db_request.db, signed_claims) + + assert "Missing required claims" in str(exc.value) + + def test_semaphore_publisher_lookup_missing_repo_slug(self, db_request): + signed_claims = { + "org": "example-org", + "org_id": "org-id-1234", + "prj": "example-project", + "prj_id": "proj-id-5678", + } + + with pytest.raises(errors.InvalidPublisherError) as exc: + semaphore.SemaphorePublisher.lookup_by_claims(db_request.db, signed_claims) + + assert "Missing required claims" in str(exc.value) + + def test_semaphore_publisher_lookup_missing_org_id(self, db_request): + signed_claims = { + "org": "example-org", + "prj": "example-project", + "prj_id": "proj-id-5678", + "repo_slug": "owner/repo", + } + + with pytest.raises(errors.InvalidPublisherError) as exc: + semaphore.SemaphorePublisher.lookup_by_claims(db_request.db, signed_claims) + + assert "Missing required claims" in str(exc.value) + + def test_semaphore_publisher_lookup_missing_prj_id(self, db_request): + signed_claims = { + "org": "example-org", + "org_id": "org-id-1234", + "prj": "example-project", + "repo_slug": "owner/repo", + } + + with pytest.raises(errors.InvalidPublisherError) as exc: + semaphore.SemaphorePublisher.lookup_by_claims(db_request.db, signed_claims) + + assert "Missing required claims" in str(exc.value) + + def test_check_sub_valid(self): + publisher = semaphore.SemaphorePublisher( + organization="example-org", + semaphore_organization_id="org-id-1234", + project="example-project", + semaphore_project_id="proj-id-5678", + repo_slug="owner/repo", + ) + + signed_claim = ( + "org:example-org:project:uuid-1234:repo:repo:" + "ref_type:branch:ref:refs/heads/main" + ) + assert semaphore._check_sub( + publisher.repo_slug, + signed_claim, + {}, + ) + + def test_check_sub_invalid(self): + publisher = semaphore.SemaphorePublisher( + organization="example-org", + semaphore_organization_id="org-id-1234", + project="example-project", + semaphore_project_id="proj-id-5678", + repo_slug="owner/repo", + ) + + signed_claim = ( + "org:example-org:project:uuid-1234:repo:different-repo:" + "ref_type:branch:ref:refs/heads/main" + ) + assert not semaphore._check_sub( + publisher.repo_slug, + signed_claim, + {}, + ) + + def test_check_sub_missing_repo(self): + publisher = semaphore.SemaphorePublisher( + organization="example-org", + semaphore_organization_id="org-id-1234", + project="example-project", + semaphore_project_id="proj-id-5678", + repo_slug="owner/repo", + ) + + signed_claim = "org:example-org:project:uuid-1234" + assert not semaphore._check_sub( + publisher.repo_slug, + signed_claim, + {}, + ) + + def test_check_sub_empty_claim(self): + publisher = semaphore.SemaphorePublisher( + organization="example-org", + semaphore_organization_id="org-id-1234", + project="example-project", + semaphore_project_id="proj-id-5678", + repo_slug="owner/repo", + ) + + assert not semaphore._check_sub(publisher.repo_slug, "", {}) + + def test_check_sub_empty_repo_value(self): + publisher = semaphore.SemaphorePublisher( + organization="example-org", + semaphore_organization_id="org-id-1234", + project="example-project", + semaphore_project_id="proj-id-5678", + repo_slug="owner/repo", + ) + + signed_claim = "org:example-org:project:uuid-1234:repo::ref_type:branch" + assert not semaphore._check_sub( + publisher.repo_slug, + signed_claim, + {}, + ) + + def test_check_sub_only_repo_prefix(self): + publisher = semaphore.SemaphorePublisher( + organization="example-org", + semaphore_organization_id="org-id-1234", + project="example-project", + semaphore_project_id="proj-id-5678", + repo_slug="owner/repo", + ) + + signed_claim = "org:example-org:project:uuid-1234:repo:" + assert not semaphore._check_sub( + publisher.repo_slug, + signed_claim, + {}, + ) + + def test_check_sub_single_part_after_repo(self): + publisher = semaphore.SemaphorePublisher( + organization="example-org", + semaphore_organization_id="org-id-1234", + project="example-project", + semaphore_project_id="proj-id-5678", + repo_slug="owner/repo", + ) + + signed_claim = "incomplete:repo:" + assert not semaphore._check_sub( + publisher.repo_slug, + signed_claim, + {}, + ) + + def test_jti_property(self): + publisher = semaphore.SemaphorePublisher( + organization="example-org", + semaphore_organization_id="org-id-1234", + project="example-project", + semaphore_project_id="proj-id-5678", + repo_slug="owner/repo", + ) + + assert publisher.jti == "placeholder" + + def test_attestation_identity(self): + publisher = semaphore.SemaphorePublisher( + organization="example-org", + semaphore_organization_id="org-id-1234", + project="example-project", + semaphore_project_id="proj-id-5678", + repo_slug="owner/repo", + ) + + assert publisher.attestation_identity is None + + def test_exists_true(self, db_request): + publisher = semaphore.SemaphorePublisher( + organization="example-org", + semaphore_organization_id="org-id-1234", + project="example-project", + semaphore_project_id="proj-id-5678", + repo_slug="owner/repo", + ) + db_request.db.add(publisher) + db_request.db.flush() + + assert publisher.exists(db_request.db) + + +class TestPendingSemaphorePublisher: + def test_reify(self, db_request): + user = UserFactory.create() + pending_publisher = semaphore.PendingSemaphorePublisher( + organization="example-org", + semaphore_organization_id="a1b2c3d4-e5f6-7890-abcd-ef1234567890", + project="example-project", + semaphore_project_id="b2c3d4e5-f6a7-8901-bcde-f01234567891", + repo_slug="owner/repo", + project_name="example-pypi-project", + added_by_id=user.id, + ) + + db_request.db.add(pending_publisher) + db_request.db.flush() + + publisher = pending_publisher.reify(db_request.db) + + assert isinstance(publisher, semaphore.SemaphorePublisher) + assert publisher.organization == "example-org" + assert ( + publisher.semaphore_organization_id + == "a1b2c3d4-e5f6-7890-abcd-ef1234567890" + ) + assert publisher.project == "example-project" + assert publisher.semaphore_project_id == "b2c3d4e5-f6a7-8901-bcde-f01234567891" + assert publisher.repo_slug == "owner/repo" + + # The pending publisher should be deleted + assert ( + db_request.db.query(semaphore.PendingSemaphorePublisher) + .filter_by(id=pending_publisher.id) + .count() + == 0 + ) + + def test_reify_existing_publisher(self, db_request): + user = UserFactory.create() + # Use matching IDs + org_id = "a1b2c3d4-e5f6-7890-abcd-ef1234567890" + proj_id = "b2c3d4e5-f6a7-8901-bcde-f01234567891" + + existing_publisher = semaphore.SemaphorePublisher( + organization="example-org", + semaphore_organization_id=org_id, + project="example-project", + semaphore_project_id=proj_id, + repo_slug="owner/repo", + ) + db_request.db.add(existing_publisher) + db_request.db.flush() + + pending_publisher = semaphore.PendingSemaphorePublisher( + organization="example-org", + semaphore_organization_id=org_id, + project="example-project", + semaphore_project_id=proj_id, + repo_slug="owner/repo", + project_name="example-pypi-project", + added_by_id=user.id, + ) + db_request.db.add(pending_publisher) + db_request.db.flush() + + publisher = pending_publisher.reify(db_request.db) + + assert publisher == existing_publisher + + # The pending publisher should be deleted + assert ( + db_request.db.query(semaphore.PendingSemaphorePublisher) + .filter_by(id=pending_publisher.id) + .count() + == 0 + ) diff --git a/tests/unit/oidc/test_utils.py b/tests/unit/oidc/test_utils.py index ab19a626dc04..d93fcf8a4e88 100644 --- a/tests/unit/oidc/test_utils.py +++ b/tests/unit/oidc/test_utils.py @@ -12,14 +12,17 @@ GitHubPublisherFactory, GitLabPublisherFactory, GooglePublisherFactory, + SemaphorePublisherFactory, ) from tests.common.db.organizations import OrganizationOIDCIssuerFactory from warehouse.oidc import errors, utils from warehouse.oidc.models import ( + SEMAPHORE_OIDC_ISSUER_URL_SUFFIX, ActiveStatePublisher, GitHubPublisher, GitLabPublisher, GooglePublisher, + SemaphorePublisher, ) from warehouse.oidc.utils import OIDC_PUBLISHER_CLASSES from warehouse.organizations.models import OIDCIssuerType @@ -298,6 +301,36 @@ def test_find_publisher_by_issuer_activestate( ) +def test_find_publisher_by_issuer_semaphore(db_request): + SemaphorePublisherFactory( + id="aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa", + organization="example-org", + semaphore_organization_id="a1b2c3d4-e5f6-7890-abcd-ef1234567890", + project="example-project", + semaphore_project_id="b2c3d4e5-f6a7-8901-bcde-f01234567891", + repo_slug="owner/repo", + ) + + signed_claims = { + claim_name: "fake" for claim_name in SemaphorePublisher.all_known_claims() + } + signed_claims.update( + { + "org": "example-org", + "org_id": "a1b2c3d4-e5f6-7890-abcd-ef1234567890", + "prj": "example-project", + "prj_id": "b2c3d4e5-f6a7-8901-bcde-f01234567891", + "repo_slug": "owner/repo", + } + ) + + assert utils.find_publisher_by_issuer( + db_request.db, + f"https://example-org{SEMAPHORE_OIDC_ISSUER_URL_SUFFIX}", + signed_claims, + ).id == uuid.UUID("aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa") + + def test_oidc_context_principals(): assert principals_for( utils.PublisherTokenContext(publisher=pretend.stub(id=17), claims=None) diff --git a/tests/unit/oidc/test_views.py b/tests/unit/oidc/test_views.py index 26990309d234..529076a127eb 100644 --- a/tests/unit/oidc/test_views.py +++ b/tests/unit/oidc/test_views.py @@ -33,7 +33,11 @@ from warehouse.packaging.models import Project from warehouse.rate_limiting.interfaces import IRateLimiter -from ...common.constants import DUMMY_ACTIVESTATE_OIDC_JWT, DUMMY_GITHUB_OIDC_JWT +from ...common.constants import ( + DUMMY_ACTIVESTATE_OIDC_JWT, + DUMMY_GITHUB_OIDC_JWT, + DUMMY_SEMAPHORE_OIDC_JWT, +) def test_ratelimiters(): @@ -82,6 +86,7 @@ def test_oidc_audience(): [ (DUMMY_GITHUB_OIDC_JWT, "github"), (DUMMY_ACTIVESTATE_OIDC_JWT, "activestate"), + (DUMMY_SEMAPHORE_OIDC_JWT, "semaphore"), ], ) def test_mint_token_from_oidc_not_enabled(token, service_name, request): diff --git a/warehouse/admin/flags.py b/warehouse/admin/flags.py index 83c5350a1b2b..4ff720c41eb7 100644 --- a/warehouse/admin/flags.py +++ b/warehouse/admin/flags.py @@ -20,6 +20,7 @@ class AdminFlagValue(enum.Enum): DISALLOW_GITLAB_OIDC = "disallow-gitlab-oidc" DISALLOW_GOOGLE_OIDC = "disallow-google-oidc" DISALLOW_ACTIVESTATE_OIDC = "disallow-activestate-oidc" + DISALLOW_SEMAPHORE_OIDC = "disallow-semaphore-oidc" READ_ONLY = "read-only" diff --git a/warehouse/migrations/versions/7a97c540ed60_add_semaphoreci_oidc_models.py b/warehouse/migrations/versions/7a97c540ed60_add_semaphoreci_oidc_models.py new file mode 100644 index 000000000000..397313f9a045 --- /dev/null +++ b/warehouse/migrations/versions/7a97c540ed60_add_semaphoreci_oidc_models.py @@ -0,0 +1,68 @@ +# SPDX-License-Identifier: Apache-2.0 +""" +Add SemaphoreCI OIDC models + +Revision ID: 7a97c540ed60 +Revises: a25f3d5186a9 +Create Date: 2025-11-04 00:00:00.000000 +""" + +import sqlalchemy as sa + +from alembic import op +from sqlalchemy.dialects import postgresql + +revision = "7a97c540ed60" +down_revision = "a25f3d5186a9" + + +def upgrade(): + op.create_table( + "semaphore_oidc_publishers", + sa.Column("organization", sa.String(), nullable=False), + sa.Column("semaphore_organization_id", sa.String(), nullable=False), + sa.Column("project", sa.String(), nullable=False), + sa.Column("semaphore_project_id", sa.String(), nullable=False), + sa.Column("repo_slug", sa.String(), nullable=False), + sa.Column("id", postgresql.UUID(as_uuid=True), nullable=False), + sa.ForeignKeyConstraint( + ["id"], + ["oidc_publishers.id"], + ), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint( + "organization", + "semaphore_organization_id", + "project", + "semaphore_project_id", + "repo_slug", + name="_semaphore_oidc_publisher_uc", + ), + ) + op.create_table( + "pending_semaphore_oidc_publishers", + sa.Column("organization", sa.String(), nullable=False), + sa.Column("semaphore_organization_id", sa.String(), nullable=False), + sa.Column("project", sa.String(), nullable=False), + sa.Column("semaphore_project_id", sa.String(), nullable=False), + sa.Column("repo_slug", sa.String(), nullable=False), + sa.Column("id", postgresql.UUID(as_uuid=True), nullable=False), + sa.ForeignKeyConstraint( + ["id"], + ["pending_oidc_publishers.id"], + ), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint( + "organization", + "semaphore_organization_id", + "project", + "semaphore_project_id", + "repo_slug", + name="_pending_semaphore_oidc_publisher_uc", + ), + ) + + +def downgrade(): + op.drop_table("pending_semaphore_oidc_publishers") + op.drop_table("semaphore_oidc_publishers") diff --git a/warehouse/oidc/__init__.py b/warehouse/oidc/__init__.py index 6d06917f3754..d119747c173e 100644 --- a/warehouse/oidc/__init__.py +++ b/warehouse/oidc/__init__.py @@ -6,6 +6,7 @@ from celery.schedules import crontab from warehouse.oidc.interfaces import IOIDCPublisherService +from warehouse.oidc.models import SEMAPHORE_OIDC_ISSUER_URL_SUFFIX from warehouse.oidc.services import OIDCPublisherServiceFactory from warehouse.oidc.tasks import compute_oidc_metrics, delete_expired_oidc_macaroons from warehouse.oidc.utils import ( @@ -62,6 +63,19 @@ def includeme(config: Configurator) -> None: name="activestate", ) + # Semaphore uses org-specific issuer URLs (https://.semaphoreci.com) + # The placeholder issuer URL here won't be used for verification; + # the actual issuer URL comes from the JWT itself + config.register_service_factory( + OIDCPublisherServiceFactory( + publisher="semaphore", + issuer_url=f"https://*{SEMAPHORE_OIDC_ISSUER_URL_SUFFIX}", + service_class=oidc_publisher_service_class, + ), + IOIDCPublisherService, + name="semaphore", + ) + # During deployments, we separate auth routes into their own subdomain # to simplify caching exclusion. auth = config.get_settings().get("auth.domain") diff --git a/warehouse/oidc/forms/__init__.py b/warehouse/oidc/forms/__init__.py index 1db3adc81e43..eb9bdd22d5fe 100644 --- a/warehouse/oidc/forms/__init__.py +++ b/warehouse/oidc/forms/__init__.py @@ -8,6 +8,10 @@ from warehouse.oidc.forms.github import GitHubPublisherForm, PendingGitHubPublisherForm from warehouse.oidc.forms.gitlab import GitLabPublisherForm, PendingGitLabPublisherForm from warehouse.oidc.forms.google import GooglePublisherForm, PendingGooglePublisherForm +from warehouse.oidc.forms.semaphore import ( + PendingSemaphorePublisherForm, + SemaphorePublisherForm, +) __all__ = [ "DeletePublisherForm", @@ -19,4 +23,6 @@ "PendingGooglePublisherForm", "ActiveStatePublisherForm", "PendingActiveStatePublisherForm", + "SemaphorePublisherForm", + "PendingSemaphorePublisherForm", ] diff --git a/warehouse/oidc/forms/semaphore.py b/warehouse/oidc/forms/semaphore.py new file mode 100644 index 000000000000..fe92eec2523d --- /dev/null +++ b/warehouse/oidc/forms/semaphore.py @@ -0,0 +1,74 @@ +# SPDX-License-Identifier: Apache-2.0 + +import wtforms + +from warehouse.oidc.forms._core import PendingPublisherMixin + + +class SemaphorePublisherBase(wtforms.Form): + __params__ = [ + "organization", + "semaphore_organization_id", + "project", + "semaphore_project_id", + "repo_slug", + ] + + organization = wtforms.StringField( + validators=[ + wtforms.validators.InputRequired(message="Specify organization name"), + wtforms.validators.Regexp( + r"^[a-zA-Z0-9](?:[a-zA-Z0-9-]*[a-zA-Z0-9])?$", + message="Invalid organization name", + ), + ] + ) + + semaphore_organization_id = wtforms.StringField( + validators=[ + wtforms.validators.InputRequired(message="Specify organization ID"), + ] + ) + + project = wtforms.StringField( + validators=[ + wtforms.validators.InputRequired(message="Specify project name"), + ] + ) + + semaphore_project_id = wtforms.StringField( + validators=[ + wtforms.validators.InputRequired(message="Specify project ID"), + ] + ) + + repo_slug = wtforms.StringField( + validators=[ + wtforms.validators.InputRequired(message="Specify repository (owner/repo)"), + wtforms.validators.Regexp( + r"^[^/]+/[^/]+$", + message="Invalid repository format, expected 'owner/repo'", + ), + ] + ) + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + +class PendingSemaphorePublisherForm(SemaphorePublisherBase, PendingPublisherMixin): + __params__ = SemaphorePublisherBase.__params__ + ["project_name"] + + def __init__(self, *args, route_url, check_project_name, user, **kwargs): + super().__init__(*args, **kwargs) + self._route_url = route_url + self._check_project_name = check_project_name + self._user = user + + @property + def provider(self) -> str: + return "semaphore" + + +class SemaphorePublisherForm(SemaphorePublisherBase): + pass diff --git a/warehouse/oidc/models/__init__.py b/warehouse/oidc/models/__init__.py index 9b57c62d0c1b..bf464ef9805e 100644 --- a/warehouse/oidc/models/__init__.py +++ b/warehouse/oidc/models/__init__.py @@ -21,6 +21,11 @@ GooglePublisher, PendingGooglePublisher, ) +from warehouse.oidc.models.semaphore import ( + SEMAPHORE_OIDC_ISSUER_URL_SUFFIX, + PendingSemaphorePublisher, + SemaphorePublisher, +) __all__ = [ "OIDCPublisher", @@ -29,12 +34,15 @@ "PendingGitLabPublisher", "PendingGooglePublisher", "PendingActiveStatePublisher", + "PendingSemaphorePublisher", "GitHubPublisher", "GitLabPublisher", "GooglePublisher", "ActiveStatePublisher", + "SemaphorePublisher", "ACTIVESTATE_OIDC_ISSUER_URL", "GITHUB_OIDC_ISSUER_URL", "GITLAB_OIDC_ISSUER_URL", "GOOGLE_OIDC_ISSUER_URL", + "SEMAPHORE_OIDC_ISSUER_URL_SUFFIX", ] diff --git a/warehouse/oidc/models/semaphore.py b/warehouse/oidc/models/semaphore.py new file mode 100644 index 000000000000..4067a3cfac6a --- /dev/null +++ b/warehouse/oidc/models/semaphore.py @@ -0,0 +1,278 @@ +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import typing + +from typing import Any, Self +from uuid import UUID + +from sqlalchemy import ForeignKey, String, UniqueConstraint, and_, exists +from sqlalchemy.dialects.postgresql import UUID as PG_UUID +from sqlalchemy.orm import Mapped, Query, mapped_column + +from warehouse.oidc.errors import InvalidPublisherError +from warehouse.oidc.interfaces import SignedClaims +from warehouse.oidc.models._core import ( + CheckClaimCallable, + OIDCPublisher, + PendingOIDCPublisher, + check_claim_binary, + check_existing_jti, +) + +if typing.TYPE_CHECKING: + from sqlalchemy.orm import Session + + +SEMAPHORE_OIDC_ISSUER_URL_SUFFIX = ".semaphoreci.com" + + +def _check_sub( + ground_truth: str, + signed_claim: str, + _all_signed_claims: SignedClaims, + **_kwargs, +) -> bool: + # Semaphore's sub claim contains: + # org::project::repo::ref_type::ref: + # The :repo: field contains just the repository name (not owner/repo) + # ground_truth is in format "repo_slug:owner/repo", so we extract the repo_slug part + + # Extract the repo portion from the sub claim + if not signed_claim or ":repo:" not in signed_claim: + return False + + repo_in_sub = signed_claim.split(":repo:", 1)[1].split(":", 1)[0] + if not repo_in_sub: + return False + + # Extract repo_slug from ground_truth (format: "repo_slug:owner/repo") + repo_slug = ground_truth.removeprefix("repo_slug:") + + # Extract just the repo name from repo_slug (owner/repo -> repo) + repo_name = repo_slug.split("/")[-1] + + # Compare case-insensitively + return repo_in_sub.lower() == repo_name.lower() + + +class SemaphorePublisherMixin: + """ + Common functionality for both pending and concrete SemaphoreCI OIDC publishers. + """ + + organization: Mapped[str] = mapped_column(String, nullable=False) + # Note: We use semaphore_organization_id and semaphore_project_id to avoid + # naming conflicts with PendingOIDCPublisher.organization_id (which is a UUID FK + # to PyPI organizations). These store SemaphoreCI's own UUID identifiers as strings. + semaphore_organization_id: Mapped[str] = mapped_column(String, nullable=False) + project: Mapped[str] = mapped_column(String, nullable=False) + semaphore_project_id: Mapped[str] = mapped_column(String, nullable=False) + repo_slug: Mapped[str] = mapped_column(String, nullable=False) + + __required_verifiable_claims__: dict[str, CheckClaimCallable[Any]] = { + "sub": _check_sub, + "org": check_claim_binary(str.__eq__), + "org_id": check_claim_binary(str.__eq__), + "prj": check_claim_binary(str.__eq__), + "prj_id": check_claim_binary(str.__eq__), + "repo_slug": check_claim_binary(str.__eq__), + "jti": check_existing_jti, + } + + __unchecked_claims__ = { + "repo", + "wf_id", + "ppl_id", + "job_id", + "branch", + "pr_branch", + "pr", + "ref", + "ref_type", + "tag", + "job_type", + "trg", + "sub127", + } + + @classmethod + def lookup_by_claims(cls, session: Session, signed_claims: SignedClaims) -> Self: + org = signed_claims.get("org") + org_id = signed_claims.get("org_id") + prj = signed_claims.get("prj") + prj_id = signed_claims.get("prj_id") + repo_slug = signed_claims.get("repo_slug") + + if not org or not org_id or not prj or not prj_id or not repo_slug: + raise InvalidPublisherError( + "Missing required claims: 'org', 'org_id', 'prj', " + "'prj_id', or 'repo_slug'" + ) + + query: Query = Query(cls).filter_by( + organization=org, + semaphore_organization_id=org_id, + project=prj, + semaphore_project_id=prj_id, + repo_slug=repo_slug, + ) + publisher = query.with_session(session).one_or_none() + + if publisher is None: + raise InvalidPublisherError("Publisher with matching claims was not found") + + return publisher + + @property + def publisher_name(self) -> str: + return "SemaphoreCI" + + @property + def sub(self) -> str: + return f"repo_slug:{self.repo_slug}" + + @property + def repo(self) -> str: + # Extract just the repository name from owner/repo + return ( + self.repo_slug.split("/")[-1] if "/" in self.repo_slug else self.repo_slug + ) + + @property + def org(self) -> str: + return self.organization + + @property + def org_id(self) -> str: + return self.semaphore_organization_id + + @property + def prj(self) -> str: + return self.project + + @property + def prj_id(self) -> str: + return self.semaphore_project_id + + @property + def jti(self) -> str: + return "placeholder" + + @property + def publisher_base_url(self) -> str | None: + # Semaphore projects can be hosted on any git provider + # We return None since we don't have a canonical URL + return None + + def publisher_url(self, claims: SignedClaims | None = None) -> str | None: + # We don't have enough information to construct a canonical URL + # since Semaphore can work with any git provider + return None + + @property + def attestation_identity(self) -> None: + return None + + def stored_claims(self, claims: SignedClaims | None = None) -> dict: + claims_obj = claims if claims else {} + return { + "ref": claims_obj.get("ref"), + "ref_type": claims_obj.get("ref_type"), + } + + def __str__(self) -> str: + return self.repo_slug + + def exists(self, session: Session) -> bool: + return session.query( + exists().where( + and_( + self.__class__.organization == self.organization, + self.__class__.semaphore_organization_id + == self.semaphore_organization_id, + self.__class__.project == self.project, + self.__class__.semaphore_project_id == self.semaphore_project_id, + self.__class__.repo_slug == self.repo_slug, + ) + ) + ).scalar() + + @property + def admin_details(self) -> list[tuple[str, str]]: + return [ + ("Organization", self.organization), + ("Organization ID", self.semaphore_organization_id), + ("Project", self.project), + ("Project ID", self.semaphore_project_id), + ("Repository", self.repo_slug), + ] + + +class SemaphorePublisher(SemaphorePublisherMixin, OIDCPublisher): + __tablename__ = "semaphore_oidc_publishers" + __mapper_args__ = {"polymorphic_identity": "semaphore_oidc_publishers"} + __table_args__ = ( + UniqueConstraint( + "organization", + "semaphore_organization_id", + "project", + "semaphore_project_id", + "repo_slug", + name="_semaphore_oidc_publisher_uc", + ), + ) + + id: Mapped[UUID] = mapped_column( + PG_UUID(as_uuid=True), ForeignKey(OIDCPublisher.id), primary_key=True + ) + + +class PendingSemaphorePublisher(SemaphorePublisherMixin, PendingOIDCPublisher): + __tablename__ = "pending_semaphore_oidc_publishers" + __mapper_args__ = {"polymorphic_identity": "pending_semaphore_oidc_publishers"} + __table_args__ = ( # type: ignore[assignment] + UniqueConstraint( + "organization", + "semaphore_organization_id", + "project", + "semaphore_project_id", + "repo_slug", + name="_pending_semaphore_oidc_publisher_uc", + ), + ) + + id: Mapped[UUID] = mapped_column( + PG_UUID(as_uuid=True), ForeignKey(PendingOIDCPublisher.id), primary_key=True + ) + + def reify(self, session: Session) -> SemaphorePublisher: + """ + Returns a `SemaphorePublisher` for this `PendingSemaphorePublisher`, + deleting the `PendingSemaphorePublisher` in the process. + """ + + maybe_publisher = ( + session.query(SemaphorePublisher) + .filter( + SemaphorePublisher.organization == self.organization, + SemaphorePublisher.semaphore_organization_id + == self.semaphore_organization_id, + SemaphorePublisher.project == self.project, + SemaphorePublisher.semaphore_project_id == self.semaphore_project_id, + SemaphorePublisher.repo_slug == self.repo_slug, + ) + .one_or_none() + ) + + publisher = maybe_publisher or SemaphorePublisher( + organization=self.organization, + semaphore_organization_id=self.semaphore_organization_id, + project=self.project, + semaphore_project_id=self.semaphore_project_id, + repo_slug=self.repo_slug, + ) + + session.delete(self) + return publisher diff --git a/warehouse/oidc/utils.py b/warehouse/oidc/utils.py index 9635421809f9..a6b872cdad29 100644 --- a/warehouse/oidc/utils.py +++ b/warehouse/oidc/utils.py @@ -17,6 +17,7 @@ GITHUB_OIDC_ISSUER_URL, GITLAB_OIDC_ISSUER_URL, GOOGLE_OIDC_ISSUER_URL, + SEMAPHORE_OIDC_ISSUER_URL_SUFFIX, ActiveStatePublisher, GitHubPublisher, GitLabPublisher, @@ -27,6 +28,8 @@ PendingGitLabPublisher, PendingGooglePublisher, PendingOIDCPublisher, + PendingSemaphorePublisher, + SemaphorePublisher, ) from warehouse.organizations.models import OrganizationOIDCIssuer @@ -91,12 +94,17 @@ def find_publisher_by_issuer( Raises if no publisher can be found. """ - try: - publisher_cls = OIDC_PUBLISHER_CLASSES[issuer_url][pending] - except KeyError: - # This indicates a logic error, since we shouldn't have verified - # claims for an issuer that we don't recognize and support. - raise InvalidPublisherError(f"Issuer {issuer_url!r} is unsupported") + # Check if this is a SemaphoreCI issuer (org-specific URLs) + publisher_cls: type[OIDCPublisher | PendingOIDCPublisher] + if issuer_url.endswith(SEMAPHORE_OIDC_ISSUER_URL_SUFFIX): + publisher_cls = PendingSemaphorePublisher if pending else SemaphorePublisher + else: + try: + publisher_cls = OIDC_PUBLISHER_CLASSES[issuer_url][pending] + except KeyError: + # This indicates a logic error, since we shouldn't have verified + # claims for an issuer that we don't recognize and support. + raise InvalidPublisherError(f"Issuer {issuer_url!r} is unsupported") # Before looking up the publisher by claims, we need to ensure that all expected # claims are present in the JWT. diff --git a/warehouse/oidc/views.py b/warehouse/oidc/views.py index 4ca3d009f00a..b6656289b7a3 100644 --- a/warehouse/oidc/views.py +++ b/warehouse/oidc/views.py @@ -13,6 +13,7 @@ from pyramid.request import Request from pyramid.view import view_config +from warehouse.admin.flags import AdminFlagValue from warehouse.email import send_environment_ignored_in_trusted_publisher_email from warehouse.events.tags import EventTag from warehouse.macaroons import caveats @@ -21,7 +22,12 @@ from warehouse.metrics.interfaces import IMetricsService from warehouse.oidc.errors import InvalidPublisherError, ReusedTokenError from warehouse.oidc.interfaces import IOIDCPublisherService, SignedClaims -from warehouse.oidc.models import GitHubPublisher, OIDCPublisher, PendingOIDCPublisher +from warehouse.oidc.models import ( + SEMAPHORE_OIDC_ISSUER_URL_SUFFIX, + GitHubPublisher, + OIDCPublisher, + PendingOIDCPublisher, +) from warehouse.oidc.models.gitlab import GitLabPublisher from warehouse.oidc.services import OIDCPublisherService from warehouse.oidc.utils import ( @@ -141,6 +147,11 @@ def mint_token_from_oidc(request: Request): # Associate the given issuer claim with Warehouse's OIDCPublisherService. # First, try the standard issuers service_name = OIDC_ISSUER_SERVICE_NAMES.get(unverified_issuer) + # Check for Semaphore (org-specific issuer URLs) + if not service_name and unverified_issuer.endswith( + SEMAPHORE_OIDC_ISSUER_URL_SUFFIX + ): + service_name = "semaphore" # If not in global mapping, check for organization-specific custom issuer if not service_name: service_name = lookup_custom_issuer_type(request.db, unverified_issuer) @@ -159,7 +170,14 @@ def mint_token_from_oidc(request: Request): request=request, ) - if request.flags.disallow_oidc(OIDC_ISSUER_ADMIN_FLAGS.get(unverified_issuer)): + # Check if this issuer is disabled via admin flags + # For most issuers, we can look up the flag directly in the mapping + # For Semaphore, which uses org-specific issuer URLs, we check the suffix + admin_flag = OIDC_ISSUER_ADMIN_FLAGS.get(unverified_issuer) + if not admin_flag and unverified_issuer.endswith(SEMAPHORE_OIDC_ISSUER_URL_SUFFIX): + admin_flag = AdminFlagValue.DISALLOW_SEMAPHORE_OIDC + + if request.flags.disallow_oidc(admin_flag): return _invalid( errors=[ {