diff --git a/forms_pro/api/export/__init__.py b/forms_pro/api/export/__init__.py new file mode 100644 index 0000000..c6b2652 --- /dev/null +++ b/forms_pro/api/export/__init__.py @@ -0,0 +1,3 @@ +from .endpoints import export_submissions + +__all__ = ["export_submissions"] diff --git a/forms_pro/api/export.py b/forms_pro/api/export/endpoints.py similarity index 94% rename from forms_pro/api/export.py rename to forms_pro/api/export/endpoints.py index f60d643..a0e9091 100644 --- a/forms_pro/api/export.py +++ b/forms_pro/api/export/endpoints.py @@ -7,11 +7,13 @@ from werkzeug.utils import secure_filename from forms_pro.forms_pro.doctype.form.form import Form +from forms_pro.utils.permissions import require_permission _SUPPORTED_FILE_TYPES = ("CSV", "Excel") @frappe.whitelist() +@require_permission("Form", "write", param="form_id") def export_submissions(form_id: str, file_type: Literal["CSV", "Excel"] = "CSV") -> None: """ Export submissions for a form as a CSV or XLSX download. @@ -31,12 +33,6 @@ def export_submissions(form_id: str, file_type: Literal["CSV", "Excel"] = "CSV") frappe.ValidationError, ) - if not frappe.has_permission(doctype="Form", ptype="write", doc=form_id): - frappe.throw( - _("You do not have permission to export submissions for this form."), - frappe.PermissionError, - ) - form: Form = frappe.get_doc("Form", form_id) linked_doctype = form.linked_doctype diff --git a/forms_pro/tests/test_export.py b/forms_pro/api/export/test_export.py similarity index 94% rename from forms_pro/tests/test_export.py rename to forms_pro/api/export/test_export.py index ca10ce0..b1b1d66 100644 --- a/forms_pro/tests/test_export.py +++ b/forms_pro/api/export/test_export.py @@ -213,11 +213,20 @@ def test_user_without_write_permission_raises_permission_error(self) -> None: frappe.set_user(FORMS_PRO_TEST_USER) try: - with self.assertRaises(frappe.PermissionError): + with self.assertRaises(frappe.PermissionError) as ctx: export_submissions(form_id=form.name, file_type="CSV") + self.assertEqual(ctx.exception.http_status_code, 403) finally: frappe.set_user("Administrator") + def test_raises_404_when_form_missing(self) -> None: + """A missing form_id must surface as DoesNotExistError (404) so the + frontend renders the Not Found state, not Access Denied.""" + frappe.set_user("Administrator") + with self.assertRaises(frappe.DoesNotExistError) as ctx: + export_submissions(form_id="MISSING_FORM_XYZ", file_type="CSV") + self.assertEqual(ctx.exception.http_status_code, 404) + class TestExportAccessLog(IntegrationTestCase): """An export must be recorded in the Access Log for audit.""" @@ -301,7 +310,7 @@ def test_session_user_restored_when_exporter_raises(self) -> None: frappe.set_user(FORMS_PRO_TEST_USER) with patch( - "forms_pro.api.export.DataExporter.build_response", + "forms_pro.api.export.endpoints.DataExporter.build_response", side_effect=RuntimeError("boom"), ): with self.assertRaises(RuntimeError): diff --git a/forms_pro/api/form/__init__.py b/forms_pro/api/form/__init__.py new file mode 100644 index 0000000..5828597 --- /dev/null +++ b/forms_pro/api/form/__init__.py @@ -0,0 +1,29 @@ +from .endpoints import ( + add_form_access, + get_doctype_fields, + get_doctype_list, + get_form, + get_form_by_route, + get_form_for_edit, + get_form_for_view, + get_form_shared_with, + get_link_field_options, + is_login_required, + remove_form_access, + set_form_permission, +) + +__all__ = [ + "add_form_access", + "get_doctype_fields", + "get_doctype_list", + "get_form", + "get_form_by_route", + "get_form_for_edit", + "get_form_for_view", + "get_form_shared_with", + "get_link_field_options", + "is_login_required", + "remove_form_access", + "set_form_permission", +] diff --git a/forms_pro/api/form.py b/forms_pro/api/form/endpoints.py similarity index 74% rename from forms_pro/api/form.py rename to forms_pro/api/form/endpoints.py index cd64e19..e9efd0e 100644 --- a/forms_pro/api/form.py +++ b/forms_pro/api/form/endpoints.py @@ -1,21 +1,13 @@ import frappe from frappe import _ from frappe.share import add_docshare, remove -from pydantic import BaseModel, Field from forms_pro.api.user import get_user from forms_pro.forms_pro.doctype.form.form import Form from forms_pro.utils.constants import FORMS_PRO_SYSTEM_FIELDNAMES, UNSUPPORTED_FRAPPE_FIELDTYPES +from forms_pro.utils.permissions import require_permission - -class FormSharedWithResponse(BaseModel): - full_name: str - user_image: str | None - email: str = Field(alias="user") - read: bool - write: bool - share: bool - submit: bool +from .schema import FormSharedWithResponse @frappe.whitelist(allow_guest=True) # nosemgrep: frappe-semgrep-rules.rules.security.guest-whitelisted-method @@ -65,6 +57,27 @@ def get_form(form_id: str) -> dict: } +@frappe.whitelist() +@require_permission("Form", "read", param="form_id") +def get_form_for_view(form_id: str) -> dict: + """Return the form document for the Manage Form page. + + Requires ``read`` permission on the Form. Returns HTTP 404 when the + form does not exist and HTTP 403 when the user lacks permission. + """ + return get_form(form_id) + + +@frappe.whitelist() +@require_permission("Form", "write", param="form_id") +def get_form_for_edit(form_id: str) -> dict: + """Return the form document for the Edit Form page. + + Requires ``write`` permission on the Form. Returns HTTP 404/403 accordingly. + """ + return get_form(form_id) + + @frappe.whitelist(allow_guest=True) # nosemgrep: frappe-semgrep-rules.rules.security.guest-whitelisted-method def get_link_field_options( doctype: str, @@ -84,19 +97,13 @@ def get_link_field_options( @frappe.whitelist() +@require_permission("Form", "read", param="form_id") def get_form_shared_with(form_id: str) -> list[frappe.Any]: - """ - Get list of users with which a form is shared. + """Get list of users with whom a form is shared. - We validate the current user has read access to the form. + Requires ``read`` permission on the Form (HTTP 403 otherwise, HTTP 404 + when the form does not exist). """ - if not frappe.has_permission( - "Form", - "read", - form_id, - ): - frappe.throw(_("You do not have read access to this form")) - form: Form = frappe.get_doc("Form", form_id) shared_with = form.shared_with() @@ -113,25 +120,22 @@ def get_form_shared_with(form_id: str) -> list[frappe.Any]: @frappe.whitelist() +@require_permission("Form", "write", param="form_id") def remove_form_access(form_id: str, user_email: str) -> None: - """ - Remove access to a form for a user. + """Remove access to a form for a user. - We validate the current user has write access to the form. - - args: - form_id: str - The ID of the form to remove access to. - user_email: str - The email of the user to remove access to. + Requires ``write`` permission on the Form (HTTP 403 otherwise, HTTP 404 + when the form does not exist). + Args: + form_id: The ID of the form to remove access from. + user_email: The email of the user whose access is being removed. """ - - if not frappe.has_permission("Form", "write", form_id): - frappe.throw(_("You do not have write access to this form")) - return remove(doctype="Form", name=form_id, user=user_email, flags={"ignore_permissions": True}) @frappe.whitelist() +@require_permission("Form", "share", param="form_id") def add_form_access( form_id: str, user: str, @@ -140,13 +144,12 @@ def add_form_access( share: bool = False, submit: bool = False, ) -> None: - """ - Grant a user access to a form with the specified permissions. + """Grant a user access to a form with the specified permissions. Uses ``ignore_share_permission`` so the record can be shared regardless of - the caller's role-level DocShare permissions — the explicit - ``frappe.has_permission`` check below enforces that only users with share - access on this particular form can invoke this endpoint. + the caller's role-level DocShare permissions — the ``@require_permission`` + decorator enforces that only users with share access on this particular form + can invoke this endpoint. Args: form_id: Name of the Form document to share. @@ -157,12 +160,9 @@ def add_form_access( submit: Allow the user to submit the form (default False). Raises: - frappe.PermissionError: If the calling user does not have share access - on the specified form. + frappe.PermissionError: HTTP 403, when the caller lacks share access. + frappe.DoesNotExistError: HTTP 404, when the form does not exist. """ - if not frappe.has_permission("Form", "share", form_id): - frappe.throw(_("You do not have share access to this form"), frappe.PermissionError) - add_docshare( doctype="Form", name=form_id, @@ -176,14 +176,14 @@ def add_form_access( @frappe.whitelist() +@require_permission("Form", "share", param="form_id") def set_form_permission( form_id: str, user: str, permission_to: str, value: bool, ) -> None: - """ - Toggle a single permission bit for a user on a form. + """Toggle a single permission bit for a user on a form. Designed for per-toggle updates from the sharing UI — only the specified permission field is changed; all other existing permissions are preserved by @@ -197,14 +197,11 @@ def set_form_permission( value: ``True`` to grant the permission, ``False`` to revoke it. Raises: - frappe.PermissionError: If the calling user does not have share access - on the specified form. + frappe.PermissionError: HTTP 403, when the caller lacks share access. + frappe.DoesNotExistError: HTTP 404, when the form does not exist. frappe.ValidationError: If ``permission_to`` is not a recognised permission type. """ - if not frappe.has_permission("Form", "share", form_id): - frappe.throw(_("You do not have share access to this form"), frappe.PermissionError) - # Guard against arbitrary kwargs being forwarded to add_docshare allowed_permissions = {"read", "write", "share", "submit"} if permission_to not in allowed_permissions: diff --git a/forms_pro/api/form/schema.py b/forms_pro/api/form/schema.py new file mode 100644 index 0000000..8a4fd5e --- /dev/null +++ b/forms_pro/api/form/schema.py @@ -0,0 +1,11 @@ +from pydantic import BaseModel, Field + + +class FormSharedWithResponse(BaseModel): + full_name: str + user_image: str | None + email: str = Field(alias="user") + read: bool + write: bool + share: bool + submit: bool diff --git a/forms_pro/api/form/test_form.py b/forms_pro/api/form/test_form.py new file mode 100644 index 0000000..c88b60c --- /dev/null +++ b/forms_pro/api/form/test_form.py @@ -0,0 +1,193 @@ +import frappe +from frappe.share import add_docshare +from frappe.tests import IntegrationTestCase + +from forms_pro.api.form import ( + add_form_access, + get_form_for_edit, + get_form_for_view, + get_form_shared_with, + remove_form_access, + set_form_permission, +) +from forms_pro.tests import FORMS_PRO_TEST_USER +from forms_pro.tests.factories.form_factory import FormFactory + + +class TestGetFormForView(IntegrationTestCase): + def setUp(self) -> None: + self.form = FormFactory.create() + + def test_returns_form_when_user_has_read(self) -> None: + result = get_form_for_view(form_id=self.form.name) + self.assertEqual(result["name"], self.form.name) + + def test_raises_403_when_user_lacks_read(self) -> None: + with self.set_user(FORMS_PRO_TEST_USER): + with self.assertRaises(frappe.PermissionError) as ctx: + get_form_for_view(form_id=self.form.name) + self.assertEqual(ctx.exception.http_status_code, 403) + + def test_raises_404_when_form_missing(self) -> None: + with self.assertRaises(frappe.DoesNotExistError) as ctx: + get_form_for_view(form_id="MISSING_FORM_XYZ") + self.assertEqual(ctx.exception.http_status_code, 404) + + +class TestGetFormForEdit(IntegrationTestCase): + def setUp(self) -> None: + self.form = FormFactory.create() + + def test_returns_form_when_user_has_write(self) -> None: + result = get_form_for_edit(form_id=self.form.name) + self.assertEqual(result["name"], self.form.name) + + def test_raises_403_when_user_has_read_but_not_write(self) -> None: + # Share read-only with the low-privilege user, then assert write is denied. + add_docshare( + doctype="Form", + name=self.form.name, + user=FORMS_PRO_TEST_USER, + read=1, + write=0, + share=0, + flags={"ignore_share_permission": True}, + ) + with self.set_user(FORMS_PRO_TEST_USER): + with self.assertRaises(frappe.PermissionError) as ctx: + get_form_for_edit(form_id=self.form.name) + self.assertEqual(ctx.exception.http_status_code, 403) + + def test_raises_404_when_form_missing(self) -> None: + with self.assertRaises(frappe.DoesNotExistError) as ctx: + get_form_for_edit(form_id="MISSING_FORM_XYZ") + self.assertEqual(ctx.exception.http_status_code, 404) + + +class TestGetFormSharedWith(IntegrationTestCase): + def setUp(self) -> None: + self.form = FormFactory.create() + + def test_admin_can_list_shares(self) -> None: + result = get_form_shared_with(form_id=self.form.name) + self.assertIsInstance(result, list) + + def test_raises_403_when_user_lacks_read(self) -> None: + with self.set_user(FORMS_PRO_TEST_USER): + with self.assertRaises(frappe.PermissionError) as ctx: + get_form_shared_with(form_id=self.form.name) + self.assertEqual(ctx.exception.http_status_code, 403) + + def test_raises_404_when_form_missing(self) -> None: + with self.assertRaises(frappe.DoesNotExistError) as ctx: + get_form_shared_with(form_id="MISSING_FORM_XYZ") + self.assertEqual(ctx.exception.http_status_code, 404) + + +class TestAddFormAccess(IntegrationTestCase): + def setUp(self) -> None: + self.form = FormFactory.create() + + def test_admin_can_add_access(self) -> None: + add_form_access( + form_id=self.form.name, + user=FORMS_PRO_TEST_USER, + read=True, + ) + self.assertTrue( + frappe.db.exists( + "DocShare", + {"share_doctype": "Form", "share_name": self.form.name, "user": FORMS_PRO_TEST_USER}, + ) + ) + + def test_raises_403_when_user_lacks_share(self) -> None: + with self.set_user(FORMS_PRO_TEST_USER): + with self.assertRaises(frappe.PermissionError) as ctx: + add_form_access(form_id=self.form.name, user="x@y.z", read=True) + self.assertEqual(ctx.exception.http_status_code, 403) + + def test_raises_404_when_form_missing(self) -> None: + with self.assertRaises(frappe.DoesNotExistError) as ctx: + add_form_access(form_id="MISSING_FORM_XYZ", user=FORMS_PRO_TEST_USER, read=True) + self.assertEqual(ctx.exception.http_status_code, 404) + + +class TestRemoveFormAccess(IntegrationTestCase): + def setUp(self) -> None: + self.form = FormFactory.create() + add_docshare( + doctype="Form", + name=self.form.name, + user=FORMS_PRO_TEST_USER, + read=1, + flags={"ignore_share_permission": True}, + ) + + def test_admin_can_remove_access(self) -> None: + remove_form_access(form_id=self.form.name, user_email=FORMS_PRO_TEST_USER) + self.assertFalse( + frappe.db.exists( + "DocShare", + {"share_doctype": "Form", "share_name": self.form.name, "user": FORMS_PRO_TEST_USER}, + ) + ) + + def test_raises_403_when_user_lacks_write(self) -> None: + with self.set_user(FORMS_PRO_TEST_USER): + with self.assertRaises(frappe.PermissionError) as ctx: + remove_form_access(form_id=self.form.name, user_email=FORMS_PRO_TEST_USER) + self.assertEqual(ctx.exception.http_status_code, 403) + + def test_raises_404_when_form_missing(self) -> None: + with self.assertRaises(frappe.DoesNotExistError) as ctx: + remove_form_access(form_id="MISSING_FORM_XYZ", user_email=FORMS_PRO_TEST_USER) + self.assertEqual(ctx.exception.http_status_code, 404) + + +class TestSetFormPermission(IntegrationTestCase): + def setUp(self) -> None: + self.form = FormFactory.create() + add_docshare( + doctype="Form", + name=self.form.name, + user=FORMS_PRO_TEST_USER, + read=1, + flags={"ignore_share_permission": True}, + ) + + def test_admin_can_set_permission(self) -> None: + set_form_permission( + form_id=self.form.name, + user=FORMS_PRO_TEST_USER, + permission_to="write", + value=True, + ) + share = frappe.get_value( + "DocShare", + {"share_doctype": "Form", "share_name": self.form.name, "user": FORMS_PRO_TEST_USER}, + ["write"], + as_dict=True, + ) + self.assertEqual(share["write"], 1) + + def test_raises_403_when_user_lacks_share(self) -> None: + with self.set_user(FORMS_PRO_TEST_USER): + with self.assertRaises(frappe.PermissionError) as ctx: + set_form_permission( + form_id=self.form.name, + user=FORMS_PRO_TEST_USER, + permission_to="write", + value=True, + ) + self.assertEqual(ctx.exception.http_status_code, 403) + + def test_raises_404_when_form_missing(self) -> None: + with self.assertRaises(frappe.DoesNotExistError) as ctx: + set_form_permission( + form_id="MISSING_FORM_XYZ", + user=FORMS_PRO_TEST_USER, + permission_to="write", + value=True, + ) + self.assertEqual(ctx.exception.http_status_code, 404) diff --git a/forms_pro/api/settings/__init__.py b/forms_pro/api/settings/__init__.py new file mode 100644 index 0000000..7b65385 --- /dev/null +++ b/forms_pro/api/settings/__init__.py @@ -0,0 +1,3 @@ +from .endpoints import get_brand_logo, get_website_settings + +__all__ = ["get_brand_logo", "get_website_settings"] diff --git a/forms_pro/api/settings.py b/forms_pro/api/settings/endpoints.py similarity index 100% rename from forms_pro/api/settings.py rename to forms_pro/api/settings/endpoints.py diff --git a/forms_pro/api/submission/__init__.py b/forms_pro/api/submission/__init__.py new file mode 100644 index 0000000..f3fe576 --- /dev/null +++ b/forms_pro/api/submission/__init__.py @@ -0,0 +1,15 @@ +from .endpoints import ( + get_all_submissions, + get_submission, + get_submission_response, + get_user_submissions, + submit_form_response, +) + +__all__ = [ + "get_all_submissions", + "get_submission", + "get_submission_response", + "get_user_submissions", + "submit_form_response", +] diff --git a/forms_pro/api/submission.py b/forms_pro/api/submission/endpoints.py similarity index 86% rename from forms_pro/api/submission.py rename to forms_pro/api/submission/endpoints.py index 73abfdb..392d93d 100644 --- a/forms_pro/api/submission.py +++ b/forms_pro/api/submission/endpoints.py @@ -1,37 +1,16 @@ import json -from datetime import datetime from typing import Any import frappe from frappe import _ from frappe.share import add_docshare -from pydantic import BaseModel, Field, field_validator from forms_pro.forms_pro.doctype.form.form import Form from forms_pro.forms_pro.doctype.form_field.form_field import _DISPLAY_ONLY_FIELDTYPES from forms_pro.utils.form_generator import SubmissionStatus +from forms_pro.utils.permissions import require_permission - -class UserSubmissionResponse(BaseModel): - name: str = Field(description="Name of the submission") - creation: datetime = Field(description="Creation date of the submission") - modified: datetime = Field(description="Last modified date of the submission") - submission_status: str = Field( - description="Status of the submission", - alias="fp_submission_status", - default=SubmissionStatus.SUBMITTED.value, - ) - owner: str = Field(description="Owner of the submission") - - @field_validator("creation", "modified", mode="before") - @classmethod - def parse_datetime(cls, v: Any) -> datetime: - """Convert datetime string to datetime object.""" - if isinstance(v, str): - return frappe.utils.get_datetime(v) - if isinstance(v, datetime): - return v - raise ValueError(f"Invalid datetime value: {v}") +from .schema import UserSubmissionResponse def _coerce_field_value(value: Any, fieldtype: str) -> Any: @@ -200,6 +179,7 @@ def submit_form_response( @frappe.whitelist() +@require_permission("Form", "read", param="form_id") def get_user_submissions(form_id: str) -> list[UserSubmissionResponse]: """ Get the submissions for a user @@ -210,10 +190,6 @@ def get_user_submissions(form_id: str) -> list[UserSubmissionResponse]: Returns: A list of submissions for the user """ - - if frappe.session.user == "Guest": - return [] - form: Form = frappe.get_doc("Form", form_id) linked_doctype = form.linked_doctype @@ -284,6 +260,7 @@ def get_submission(submission_doctype: str, submission_name: str) -> dict[str, A @frappe.whitelist() +@require_permission("Form", "read", param="form_id") def get_all_submissions(form_id: str) -> list[UserSubmissionResponse]: """ Get all submissions for a form @@ -294,17 +271,6 @@ def get_all_submissions(form_id: str) -> list[UserSubmissionResponse]: Returns: A list of submissions for the form """ - linked_team = frappe.db.get_value("Form", form_id, "linked_team_id") - - if not linked_team: - frappe.throw(_("Form not found."), frappe.DoesNotExistError) - - if not frappe.has_permission(doctype="FP Team", ptype="write", doc=linked_team, user=frappe.session.user): - frappe.throw( - _("You do not have permission to read this form's submissions."), - frappe.PermissionError, - ) - form: Form = frappe.get_doc("Form", form_id) linked_doctype = form.linked_doctype diff --git a/forms_pro/api/submission/schema.py b/forms_pro/api/submission/schema.py new file mode 100644 index 0000000..e6f37c5 --- /dev/null +++ b/forms_pro/api/submission/schema.py @@ -0,0 +1,29 @@ +from datetime import datetime +from typing import Any + +import frappe +from pydantic import BaseModel, Field, field_validator + +from forms_pro.utils.form_generator import SubmissionStatus + + +class UserSubmissionResponse(BaseModel): + name: str = Field(description="Name of the submission") + creation: datetime = Field(description="Creation date of the submission") + modified: datetime = Field(description="Last modified date of the submission") + submission_status: str = Field( + description="Status of the submission", + alias="fp_submission_status", + default=SubmissionStatus.SUBMITTED.value, + ) + owner: str = Field(description="Owner of the submission") + + @field_validator("creation", "modified", mode="before") + @classmethod + def parse_datetime(cls, v: Any) -> datetime: + """Convert datetime string to datetime object.""" + if isinstance(v, str): + return frappe.utils.get_datetime(v) + if isinstance(v, datetime): + return v + raise ValueError(f"Invalid datetime value: {v}") diff --git a/forms_pro/api/submission/test_submission.py b/forms_pro/api/submission/test_submission.py new file mode 100644 index 0000000..6139740 --- /dev/null +++ b/forms_pro/api/submission/test_submission.py @@ -0,0 +1,46 @@ +import frappe +from frappe.tests import IntegrationTestCase + +from forms_pro.api.submission import get_all_submissions, get_user_submissions +from forms_pro.tests import FORMS_PRO_TEST_USER +from forms_pro.tests.factories.form_factory import FormFactory + + +class TestGetUserSubmissions(IntegrationTestCase): + def setUp(self) -> None: + self.form = FormFactory.create() + + def test_admin_can_list_user_submissions(self) -> None: + result = get_user_submissions(form_id=self.form.name) + self.assertIsInstance(result, list) + + def test_raises_403_when_user_lacks_read(self) -> None: + with self.set_user(FORMS_PRO_TEST_USER): + with self.assertRaises(frappe.PermissionError) as ctx: + get_user_submissions(form_id=self.form.name) + self.assertEqual(ctx.exception.http_status_code, 403) + + def test_raises_404_when_form_missing(self) -> None: + with self.assertRaises(frappe.DoesNotExistError) as ctx: + get_user_submissions(form_id="MISSING_FORM_XYZ") + self.assertEqual(ctx.exception.http_status_code, 404) + + +class TestGetAllSubmissions(IntegrationTestCase): + def setUp(self) -> None: + self.form = FormFactory.create() + + def test_admin_can_list_all_submissions(self) -> None: + result = get_all_submissions(form_id=self.form.name) + self.assertIsInstance(result, list) + + def test_raises_403_when_user_lacks_read(self) -> None: + with self.set_user(FORMS_PRO_TEST_USER): + with self.assertRaises(frappe.PermissionError) as ctx: + get_all_submissions(form_id=self.form.name) + self.assertEqual(ctx.exception.http_status_code, 403) + + def test_raises_404_when_form_missing(self) -> None: + with self.assertRaises(frappe.DoesNotExistError) as ctx: + get_all_submissions(form_id="MISSING_FORM_XYZ") + self.assertEqual(ctx.exception.http_status_code, 404) diff --git a/forms_pro/tests/test_submission_validation.py b/forms_pro/api/submission/test_submission_validation.py similarity index 98% rename from forms_pro/tests/test_submission_validation.py rename to forms_pro/api/submission/test_submission_validation.py index 610dc31..320f71d 100644 --- a/forms_pro/tests/test_submission_validation.py +++ b/forms_pro/api/submission/test_submission_validation.py @@ -5,7 +5,11 @@ import unittest from types import SimpleNamespace -from forms_pro.api.submission import _coerce_field_value, _evaluate_conditions, _validate_form_response +from forms_pro.api.submission.endpoints import ( + _coerce_field_value, + _evaluate_conditions, + _validate_form_response, +) from forms_pro.forms_pro.doctype.form_field.form_field import _DISPLAY_ONLY_FIELDTYPES from forms_pro.utils.constants import FORMS_PRO_SYSTEM_FIELDNAMES, UNSUPPORTED_FRAPPE_FIELDTYPES from forms_pro.utils.form_generator import LINKED_FORM_FIELDOPTIONS, SUBMISSION_STATUS_FIELDOPTIONS diff --git a/forms_pro/api/team/__init__.py b/forms_pro/api/team/__init__.py new file mode 100644 index 0000000..b64ac86 --- /dev/null +++ b/forms_pro/api/team/__init__.py @@ -0,0 +1,25 @@ +from .endpoints import ( + add_member_to_team_via_invitation, + create_team, + get_team_for_manage, + get_team_forms, + get_team_members, + invite_team_members, + remove_member_from_team, + save, + switch_team, + toggle_can_edit_team, +) + +__all__ = [ + "add_member_to_team_via_invitation", + "create_team", + "get_team_for_manage", + "get_team_forms", + "get_team_members", + "invite_team_members", + "remove_member_from_team", + "save", + "switch_team", + "toggle_can_edit_team", +] diff --git a/forms_pro/api/team.py b/forms_pro/api/team/endpoints.py similarity index 76% rename from forms_pro/api/team.py rename to forms_pro/api/team/endpoints.py index 574795b..96cc253 100644 --- a/forms_pro/api/team.py +++ b/forms_pro/api/team/endpoints.py @@ -5,6 +5,7 @@ from frappe.share import get_share_name from forms_pro.forms_pro.doctype.fp_team.fp_team import FPTeam, GetTeamMembersResponse +from forms_pro.utils.permissions import require_permission from forms_pro.utils.teams import ( GetTeamFormsResponseSchema, set_current_team, @@ -14,6 +15,22 @@ ) +@frappe.whitelist() +@require_permission("FP Team", "read", param="team_id") +def get_team_for_manage(team_id: str) -> dict: + """Return team details for the Manage Team page. + + Requires ``read`` permission on the FP Team. Returns HTTP 404/403 + accordingly. + """ + team: FPTeam = frappe.get_doc("FP Team", team_id) + return { + "name": team.name, + "team_name": team.team_name, + "logo": team.logo, + } + + @frappe.whitelist() def get_team_forms(team_id: str) -> list[GetTeamFormsResponseSchema]: """ @@ -30,21 +47,13 @@ def get_team_forms(team_id: str) -> list[GetTeamFormsResponseSchema]: @frappe.whitelist() +@require_permission("FP Team", "read", param="team_id") def get_team_members(team_id: str) -> list[GetTeamMembersResponse]: - """ - - Get the list of team members in a FP Team. - This endpoint checks if the session user has the permission to read this FP Team DocType + """Get the list of team members in a FP Team. + Requires ``read`` permission on the FP Team (HTTP 403 otherwise, + HTTP 404 when the team does not exist). """ - frappe.has_permission( - doctype="FP Team", - ptype="read", - doc=team_id, - user=frappe.session.user, - throw=True, - ) - # Clear cache so we read fresh DocShare data (e.g. after toggle_can_edit_team) frappe.clear_document_cache("FP Team", team_id) @@ -92,21 +101,13 @@ def switch_team(team_id: str) -> None: @frappe.whitelist(methods=["POST"]) +@require_permission("FP Team", "write", param="team_id") def invite_team_members(team_id: str, emails: list[str]) -> None: - """ - Invite team members to a team - """ - - if not frappe.has_permission( - doctype="FP Team", - ptype="write", - doc=team_id, - user=frappe.session.user, - ): - raise frappe.PermissionError( - "You do not have write permission on this team; write access is required to invite members" - ) + """Invite team members to a team. + Requires ``write`` permission on the FP Team (HTTP 403 otherwise, + HTTP 404 when the team does not exist). + """ emails_str = ", ".join(emails) invite_by_email( @@ -157,21 +158,13 @@ def add_member_to_team_via_invitation(team_id: str, invite_id: str | None = None @frappe.whitelist(methods=["POST"]) +@require_permission("FP Team", "write", param="team_id") def toggle_can_edit_team(team_id: str, member_email: str) -> None: - """ - Toggle the can_edit_team permission for a team member - """ - - if not frappe.has_permission( - doctype="FP Team", - ptype="write", - doc=team_id, - user=frappe.session.user, - ): - raise frappe.PermissionError( - "You do not have permission to toggle the can_edit_team permission for this team member" - ) + """Toggle the can_edit_team permission for a team member. + Requires ``write`` permission on the FP Team (HTTP 403 otherwise, + HTTP 404 when the team does not exist). + """ team: FPTeam = frappe.get_doc("FP Team", team_id) if team.owner == member_email: raise frappe.PermissionError( @@ -191,18 +184,13 @@ def toggle_can_edit_team(team_id: str, member_email: str) -> None: @frappe.whitelist(methods=["POST"]) +@require_permission("FP Team", "write", param="team_id") def save(team_id: str, fields: dict) -> None: - """ - Update team fields. Only fields present in the dict are updated. - """ - frappe.has_permission( - doctype="FP Team", - ptype="write", - doc=team_id, - user=frappe.session.user, - throw=True, - ) + """Update team fields. Only fields present in the dict are updated. + Requires ``write`` permission on the FP Team (HTTP 403 otherwise, + HTTP 404 when the team does not exist). + """ ALLOWED_SAVE_FIELDS = ["team_name", "logo"] team: FPTeam = frappe.get_doc("FP Team", team_id) @@ -214,18 +202,12 @@ def save(team_id: str, fields: dict) -> None: @frappe.whitelist(methods=["POST"]) +@require_permission("FP Team", "write", param="team_id") def remove_member_from_team(team_id: str, member_email: str) -> None: - """ - Remove a member from a team - """ - - if not frappe.has_permission( - doctype="FP Team", - ptype="write", - doc=team_id, - user=frappe.session.user, - ): - raise frappe.PermissionError("You do not have permission to remove a member from this team") + """Remove a member from a team. + Requires ``write`` permission on the FP Team (HTTP 403 otherwise, + HTTP 404 when the team does not exist). + """ team: FPTeam = frappe.get_doc("FP Team", team_id) team.remove_from_team(member_email) diff --git a/forms_pro/tests/test_invitations.py b/forms_pro/api/team/test_invitations.py similarity index 100% rename from forms_pro/tests/test_invitations.py rename to forms_pro/api/team/test_invitations.py diff --git a/forms_pro/api/team/test_team.py b/forms_pro/api/team/test_team.py new file mode 100644 index 0000000..007fb2d --- /dev/null +++ b/forms_pro/api/team/test_team.py @@ -0,0 +1,129 @@ +import frappe +from frappe.tests import IntegrationTestCase + +from forms_pro.api.team import ( + get_team_for_manage, + get_team_members, + invite_team_members, + remove_member_from_team, + save, + toggle_can_edit_team, +) +from forms_pro.tests.factories.fp_team_factory import FPTeamFactory +from forms_pro.tests.factories.user_factory import UserFactory + + +class TestGetTeamForManage(IntegrationTestCase): + def setUp(self) -> None: + self.team = FPTeamFactory.create() + # Fresh user with no Forms Pro role → no doctype-level perm on FP Team. + self.outsider = UserFactory.create() + + def test_returns_team_when_user_has_read(self) -> None: + result = get_team_for_manage(team_id=self.team.name) + self.assertEqual(result["name"], self.team.name) + self.assertEqual(result["team_name"], self.team.team_name) + + def test_raises_403_when_user_lacks_read(self) -> None: + with self.set_user(self.outsider.name): + with self.assertRaises(frappe.PermissionError) as ctx: + get_team_for_manage(team_id=self.team.name) + self.assertEqual(ctx.exception.http_status_code, 403) + + def test_raises_404_when_team_missing(self) -> None: + with self.assertRaises(frappe.DoesNotExistError) as ctx: + get_team_for_manage(team_id="MISSING_TEAM_XYZ") + self.assertEqual(ctx.exception.http_status_code, 404) + + +class TestGetTeamMembers(IntegrationTestCase): + def setUp(self) -> None: + self.team = FPTeamFactory.create() + self.outsider = UserFactory.create() + + def test_admin_can_list_members(self) -> None: + result = get_team_members(team_id=self.team.name) + self.assertIsInstance(result, list) + + def test_raises_403_when_user_lacks_read(self) -> None: + with self.set_user(self.outsider.name): + with self.assertRaises(frappe.PermissionError) as ctx: + get_team_members(team_id=self.team.name) + self.assertEqual(ctx.exception.http_status_code, 403) + + def test_raises_404_when_team_missing(self) -> None: + with self.assertRaises(frappe.DoesNotExistError) as ctx: + get_team_members(team_id="MISSING_TEAM_XYZ") + self.assertEqual(ctx.exception.http_status_code, 404) + + +class TestInviteTeamMembers(IntegrationTestCase): + def setUp(self) -> None: + self.team = FPTeamFactory.create() + self.outsider = UserFactory.create() + + def test_raises_403_when_user_lacks_write(self) -> None: + with self.set_user(self.outsider.name): + with self.assertRaises(frappe.PermissionError) as ctx: + invite_team_members(team_id=self.team.name, emails=["x@y.z"]) + self.assertEqual(ctx.exception.http_status_code, 403) + + def test_raises_404_when_team_missing(self) -> None: + with self.assertRaises(frappe.DoesNotExistError) as ctx: + invite_team_members(team_id="MISSING_TEAM_XYZ", emails=["x@y.z"]) + self.assertEqual(ctx.exception.http_status_code, 404) + + +class TestToggleCanEditTeam(IntegrationTestCase): + def setUp(self) -> None: + self.team = FPTeamFactory.create() + self.outsider = UserFactory.create() + + def test_raises_403_when_user_lacks_write(self) -> None: + with self.set_user(self.outsider.name): + with self.assertRaises(frappe.PermissionError) as ctx: + toggle_can_edit_team(team_id=self.team.name, member_email="x@y.z") + self.assertEqual(ctx.exception.http_status_code, 403) + + def test_raises_404_when_team_missing(self) -> None: + with self.assertRaises(frappe.DoesNotExistError) as ctx: + toggle_can_edit_team(team_id="MISSING_TEAM_XYZ", member_email="x@y.z") + self.assertEqual(ctx.exception.http_status_code, 404) + + +class TestSaveTeam(IntegrationTestCase): + def setUp(self) -> None: + self.team = FPTeamFactory.create() + self.outsider = UserFactory.create() + + def test_admin_can_save_team(self) -> None: + save(team_id=self.team.name, fields={"team_name": "Renamed"}) + self.assertEqual(frappe.db.get_value("FP Team", self.team.name, "team_name"), "Renamed") + + def test_raises_403_when_user_lacks_write(self) -> None: + with self.set_user(self.outsider.name): + with self.assertRaises(frappe.PermissionError) as ctx: + save(team_id=self.team.name, fields={"team_name": "Hijacked"}) + self.assertEqual(ctx.exception.http_status_code, 403) + + def test_raises_404_when_team_missing(self) -> None: + with self.assertRaises(frappe.DoesNotExistError) as ctx: + save(team_id="MISSING_TEAM_XYZ", fields={"team_name": "X"}) + self.assertEqual(ctx.exception.http_status_code, 404) + + +class TestRemoveMemberFromTeam(IntegrationTestCase): + def setUp(self) -> None: + self.team = FPTeamFactory.create() + self.outsider = UserFactory.create() + + def test_raises_403_when_user_lacks_write(self) -> None: + with self.set_user(self.outsider.name): + with self.assertRaises(frappe.PermissionError) as ctx: + remove_member_from_team(team_id=self.team.name, member_email="x@y.z") + self.assertEqual(ctx.exception.http_status_code, 403) + + def test_raises_404_when_team_missing(self) -> None: + with self.assertRaises(frappe.DoesNotExistError) as ctx: + remove_member_from_team(team_id="MISSING_TEAM_XYZ", member_email="x@y.z") + self.assertEqual(ctx.exception.http_status_code, 404) diff --git a/forms_pro/api/user/__init__.py b/forms_pro/api/user/__init__.py new file mode 100644 index 0000000..e81dc8b --- /dev/null +++ b/forms_pro/api/user/__init__.py @@ -0,0 +1,3 @@ +from .endpoints import get_current_user, get_user, get_user_teams + +__all__ = ["get_current_user", "get_user", "get_user_teams"] diff --git a/forms_pro/api/user.py b/forms_pro/api/user/endpoints.py similarity index 56% rename from forms_pro/api/user.py rename to forms_pro/api/user/endpoints.py index 20adfba..d98d11b 100644 --- a/forms_pro/api/user.py +++ b/forms_pro/api/user/endpoints.py @@ -1,40 +1,9 @@ import frappe -from frappe.core.doctype.has_role.has_role import HasRole from frappe.core.doctype.user.user import User -from pydantic import BaseModel, Field, field_validator from forms_pro.utils.teams import get_user_teams as get_user_teams_utils - -class GetUserTeamsResponseSchema(BaseModel): - name: str = Field(description="ID of the team") - team_name: str = Field(description="The name of the team") - logo: str | None = Field(description="Logo of the team") - is_current: bool = Field(description="Whether this is the current team") - - -class GetUserResponseSchema(BaseModel): - email: str - first_name: str - last_name: str | None = None - full_name: str - username: str - desk_theme: str - roles: list[str] - has_desk_access: bool - - @field_validator("roles", mode="before") - @classmethod - def extract_roles(cls, v: list[HasRole]) -> list[str]: - if not v: - return [] - - return [role.role for role in v] - - -class GetUserBasicResponse(BaseModel): - full_name: str - user_image: str | None = None +from .schema import GetUserBasicResponse, GetUserResponseSchema, GetUserTeamsResponseSchema @frappe.whitelist() diff --git a/forms_pro/api/user/schema.py b/forms_pro/api/user/schema.py new file mode 100644 index 0000000..77061ce --- /dev/null +++ b/forms_pro/api/user/schema.py @@ -0,0 +1,33 @@ +from frappe.core.doctype.has_role.has_role import HasRole +from pydantic import BaseModel, Field, field_validator + + +class GetUserTeamsResponseSchema(BaseModel): + name: str = Field(description="ID of the team") + team_name: str = Field(description="The name of the team") + logo: str | None = Field(description="Logo of the team") + is_current: bool = Field(description="Whether this is the current team") + + +class GetUserResponseSchema(BaseModel): + email: str + first_name: str + last_name: str | None = None + full_name: str + username: str + desk_theme: str + roles: list[str] + has_desk_access: bool + + @field_validator("roles", mode="before") + @classmethod + def extract_roles(cls, v: list[HasRole]) -> list[str]: + if not v: + return [] + + return [role.role for role in v] + + +class GetUserBasicResponse(BaseModel): + full_name: str + user_image: str | None = None diff --git a/forms_pro/api/user/test_user.py b/forms_pro/api/user/test_user.py new file mode 100644 index 0000000..5a5a9ea --- /dev/null +++ b/forms_pro/api/user/test_user.py @@ -0,0 +1,43 @@ +""" +Audit coverage for forms_pro.api.user. + +These endpoints gate on session state, not DocShare, so they take no +`@require_permission` decorator. The tests below document current +behavior so future regressions are caught. +""" + +from frappe.tests import IntegrationTestCase + +from forms_pro.api.user import get_current_user, get_user, get_user_teams +from forms_pro.tests import FORMS_PRO_TEST_USER + + +class TestGetUser(IntegrationTestCase): + def test_returns_basic_payload_for_existing_user(self) -> None: + result = get_user(user=FORMS_PRO_TEST_USER) + self.assertIsNotNone(result) + self.assertIn("full_name", result) + self.assertIn("user_image", result) + + def test_returns_none_for_missing_user(self) -> None: + self.assertIsNone(get_user(user="missing_user_xyz@example.com")) + + +class TestGetCurrentUser(IntegrationTestCase): + def test_returns_session_user_payload(self) -> None: + with self.set_user(FORMS_PRO_TEST_USER): + result = get_current_user() + self.assertEqual(result["email"], FORMS_PRO_TEST_USER) + self.assertIn("roles", result) + self.assertIn("has_desk_access", result) + + +class TestGetUserTeams(IntegrationTestCase): + def test_returns_list_for_real_user(self) -> None: + with self.set_user(FORMS_PRO_TEST_USER): + result = get_user_teams() + self.assertIsInstance(result, list) + + def test_returns_empty_for_guest(self) -> None: + with self.set_user("Guest"): + self.assertEqual(get_user_teams(), []) diff --git a/forms_pro/tests/test_form_field.py b/forms_pro/forms_pro/doctype/form_field/test_form_field.py similarity index 100% rename from forms_pro/tests/test_form_field.py rename to forms_pro/forms_pro/doctype/form_field/test_form_field.py diff --git a/forms_pro/forms_pro/doctype/fp_team/fp_team.py b/forms_pro/forms_pro/doctype/fp_team/fp_team.py index ac31f00..5a27f03 100644 --- a/forms_pro/forms_pro/doctype/fp_team/fp_team.py +++ b/forms_pro/forms_pro/doctype/fp_team/fp_team.py @@ -52,7 +52,7 @@ def team_members(self) -> list[GetTeamMembersResponse]: _user = get_user(member.user) _user["email"] = member.user share_name = get_share_name(doctype="FP Team", name=self.name, user=member.user, everyone=0) - _user["can_edit_team"] = frappe.db.get_value("DocShare", share_name, "write") + _user["can_edit_team"] = bool(share_name and frappe.db.get_value("DocShare", share_name, "write")) _user["is_owner"] = self.owner == member.user members.append(GetTeamMembersResponse.model_validate(_user).model_dump()) diff --git a/forms_pro/utils/permissions.py b/forms_pro/utils/permissions.py new file mode 100644 index 0000000..5af1333 --- /dev/null +++ b/forms_pro/utils/permissions.py @@ -0,0 +1,46 @@ +import functools +from collections.abc import Callable + +import frappe +from frappe import _ + + +def require_permission(doctype: str, ptype: str = "read", param: str = "name") -> Callable: + """Enforce frappe.has_permission(doctype, ptype, kwargs[param]) before fn runs. + + Raises: + frappe.DoesNotExistError (HTTP 404): when ptype != "create" and the + referenced document does not exist. + frappe.PermissionError (HTTP 403): when the user lacks the permission. + + Args: + doctype: DocType to check against. + ptype: Permission type ("read", "write", "share", "delete", "create"). + param: Keyword-argument name carrying the docname. Ignored for "create". + """ + + def decorator(fn: Callable) -> Callable: + @functools.wraps(fn) + def wrapper(**kwargs): + doc_name = kwargs.get(param) if ptype != "create" else None + + if ptype != "create" and doc_name and not frappe.db.exists(doctype, doc_name): + frappe.throw( + _("{0} {1} not found").format(_(doctype), doc_name), + frappe.DoesNotExistError, + title=_("Not Found"), + ) + + allowed = frappe.has_permission(doctype=doctype, ptype=ptype, doc=doc_name) + if not allowed: + frappe.throw( + _("You do not have {0} permission on {1}").format(_(ptype), _(doctype)), + frappe.PermissionError, + title=_("Access Denied"), + ) + + return fn(**kwargs) + + return wrapper + + return decorator diff --git a/forms_pro/utils/test_permissions.py b/forms_pro/utils/test_permissions.py new file mode 100644 index 0000000..700d3c7 --- /dev/null +++ b/forms_pro/utils/test_permissions.py @@ -0,0 +1,51 @@ +import frappe +from frappe.tests import IntegrationTestCase + +from forms_pro.tests import FORMS_PRO_TEST_USER +from forms_pro.tests.factories.form_factory import FormFactory +from forms_pro.utils.permissions import require_permission + + +class TestRequirePermission(IntegrationTestCase): + def setUp(self) -> None: + self.form = FormFactory.create() + + def test_allows_when_permission_granted(self) -> None: + @require_permission("Form", "read", param="form_id") + def fn(form_id: str) -> str: + return form_id + + self.assertEqual(fn(form_id=self.form.name), self.form.name) + + def test_raises_permission_error_when_denied(self) -> None: + @require_permission("Form", "write", param="form_id") + def fn(form_id: str) -> str: + return form_id + + with self.set_user(FORMS_PRO_TEST_USER): + with self.assertRaises(frappe.PermissionError) as ctx: + fn(form_id=self.form.name) + self.assertEqual(ctx.exception.http_status_code, 403) + + def test_raises_does_not_exist_when_doc_missing(self) -> None: + @require_permission("Form", "read", param="form_id") + def fn(form_id: str) -> str: + return form_id + + with self.assertRaises(frappe.DoesNotExistError) as ctx: + fn(form_id="NON_EXISTENT_FORM_XYZ") + self.assertEqual(ctx.exception.http_status_code, 404) + + def test_create_ptype_skips_existence_and_docname(self) -> None: + @require_permission("Form", "create") + def fn() -> str: + return "ok" + + self.assertEqual(fn(), "ok") + + def test_param_kwarg_routing(self) -> None: + @require_permission("Form", "read", param="my_id") + def fn(my_id: str) -> str: + return my_id + + self.assertEqual(fn(my_id=self.form.name), self.form.name) diff --git a/frontend/e2e/specs/route-perms.spec.ts b/frontend/e2e/specs/route-perms.spec.ts new file mode 100644 index 0000000..9a59717 --- /dev/null +++ b/frontend/e2e/specs/route-perms.spec.ts @@ -0,0 +1,55 @@ +import { test, expect } from "../fixtures/test-data.fixture"; + +test.describe("Route-level permissions", () => { + test("bogus form id renders Not Found and surfaces 404", async ({ page }) => { + const responsePromise = page.waitForResponse( + (res) => + res + .url() + .includes("/api/method/forms_pro.api.form.get_form_for_view") && + res.status() === 404, + { timeout: 15000 } + ); + + await page.goto("/forms/manage/MISSING-FORM-XYZ-E2E"); + + const response = await responsePromise; + expect(response.status()).toBe(404); + + await expect(page.getByRole("heading", { name: /not found/i })).toBeVisible( + { timeout: 10000 } + ); + }); + + test("owner navigates to edit-form and the builder renders", async ({ + page, + createForm, + }) => { + const formId = await createForm(); + + const responsePromise = page.waitForResponse( + (res) => + res + .url() + .includes("/api/method/forms_pro.api.form.get_form_for_edit") && + res.status() === 200, + { timeout: 15000 } + ); + + await page.goto(`/forms/edit-form/${formId}`); + + const response = await responsePromise; + expect(response.status()).toBe(200); + + // RouteError must not render for an authorized owner. + await expect( + page.getByRole("heading", { name: /access denied|not found/i }) + ).toHaveCount(0); + }); + + // Deferred: read-only viewer hitting /edit-form/:id should see "Access Denied" + // plus a 403 in the network. Reliable setup needs an admin-owned form shared + // read-only with the test user, which requires a second authenticated API + // context (admin login) that the current fixture does not provide. Add when + // we wire an admin fixture into e2e/global-setup. +}); diff --git a/frontend/src/App.vue b/frontend/src/App.vue index 511fbf0..ad6f7dd 100644 --- a/frontend/src/App.vue +++ b/frontend/src/App.vue @@ -2,6 +2,7 @@ import "vue-sonner/style.css"; import { Toaster } from "vue-sonner"; import GlobalDialog from "@/components/ui/GlobalDialog.vue"; +import RouteProgress from "@/components/RouteProgress.vue"; import { useUser } from "@/stores/user"; const userStore = useUser(); @@ -12,6 +13,7 @@ userStore.initialize();
HTTP {{ httpStatus }}
+ +