From 485d37f24032d7821584f5bc95f4b45e5f1ed882 Mon Sep 17 00:00:00 2001 From: xingxi0614-cpu Date: Thu, 28 May 2026 16:13:55 +0800 Subject: [PATCH 1/3] Reject control chars in public search queries --- app/activity.py | 13 ++++++++-- app/bounty_api.py | 45 +++++++++++++++++++-------------- app/mcp_tools.py | 5 ++++ tests/test_activity.py | 16 ++++++++++++ tests/test_api_mcp.py | 1 + tests/test_bounty_api_routes.py | 25 ++++++++++++++++++ tests/test_bounty_pages.py | 3 +++ 7 files changed, 87 insertions(+), 21 deletions(-) diff --git a/app/activity.py b/app/activity.py index 4cb3dce5..c91a9c9a 100644 --- a/app/activity.py +++ b/app/activity.py @@ -2,7 +2,7 @@ from typing import Any -from fastapi import FastAPI, Query, Request +from fastapi import FastAPI, HTTPException, Query, Request from fastapi.responses import HTMLResponse from fastapi.templating import Jinja2Templates from sqlalchemy.orm import Session @@ -11,8 +11,17 @@ from app.serializers import activity_to_dict +def _normalized_activity_search_query(query: str | None) -> str | None: + if query is None: + return None + normalized_query = query.strip() + if any(ord(char) < 32 or ord(char) == 127 for char in normalized_query): + raise HTTPException(status_code=400, detail="q must not contain control characters") + return normalized_query + + def activity_context(session: Session, query: str | None = None) -> dict[str, Any]: - return activity_to_dict(session, query) + return activity_to_dict(session, _normalized_activity_search_query(query)) def register_activity_routes(app: FastAPI, *, db_url: str, templates: Jinja2Templates) -> None: diff --git a/app/bounty_api.py b/app/bounty_api.py index 04f2365c..3db1643e 100644 --- a/app/bounty_api.py +++ b/app/bounty_api.py @@ -81,6 +81,14 @@ def _ledger_http_error(exc: LedgerError) -> HTTPException: return HTTPException(status_code=400, detail=detail) +def _normalized_bounty_search_query(query_text: str | None) -> str | None: + if query_text is None: + return None + if CONTROL_CHAR_RE.search(query_text): + raise HTTPException(status_code=400, detail="q must not contain control characters") + return query_text.strip() + + def register_bounty_api_routes( app: FastAPI, *, @@ -114,25 +122,24 @@ def _list_bounties_by_status( status_code=400, detail="status must be one of: open, paid, closed" ) query = query.where(Bounty.status == normalized_status) - if query_text is not None: - normalized_query = query_text.strip() - if normalized_query: - escaped_query = ( - normalized_query.lower() - .replace("\\", "\\\\") - .replace("%", "\\%") - .replace("_", "\\_") - ) - like_query = f"%{escaped_query}%" - issue_number = issue_number_search_value(normalized_query) - text_filter = or_( - func.lower(Bounty.repo).like(like_query, escape="\\"), - func.lower(Bounty.title).like(like_query, escape="\\"), - func.lower(Bounty.acceptance).like(like_query, escape="\\"), - ) - if issue_number is not None: - text_filter = or_(text_filter, Bounty.issue_number == issue_number) - query = query.where(text_filter) + normalized_query = _normalized_bounty_search_query(query_text) + if normalized_query: + escaped_query = ( + normalized_query.lower() + .replace("\\", "\\\\") + .replace("%", "\\%") + .replace("_", "\\_") + ) + like_query = f"%{escaped_query}%" + issue_number = issue_number_search_value(normalized_query) + text_filter = or_( + func.lower(Bounty.repo).like(like_query, escape="\\"), + func.lower(Bounty.title).like(like_query, escape="\\"), + func.lower(Bounty.acceptance).like(like_query, escape="\\"), + ) + if issue_number is not None: + text_filter = or_(text_filter, Bounty.issue_number == issue_number) + query = query.where(text_filter) bounties = session.scalars(query.order_by(Bounty.id.desc())).all() sorted_bounties = sort_bounties( [bounty_to_dict(bounty) for bounty in bounties], normalized_sort diff --git a/app/mcp_tools.py b/app/mcp_tools.py index 0a0bd371..f225d4ae 100644 --- a/app/mcp_tools.py +++ b/app/mcp_tools.py @@ -27,6 +27,10 @@ def call_mcp_tool(database_url: str, name: str, args: dict[str, Any]) -> str | dict[str, Any]: + def reject_control_chars(field: str, value: str) -> None: + if any(ord(char) < 32 or ord(char) == 127 for char in value): + raise ValueError(f"{field} must not contain control characters") + def int_arg(field: str) -> int: value = args[field] if isinstance(value, bool): @@ -123,6 +127,7 @@ def optional_bool_arg(field: str, default: bool = False) -> bool: query = select(Bounty).where(Bounty.status == normalized_status) query_text = optional_clean_str_arg("q") if query_text: + reject_control_chars("q", query_text) escaped_query = ( query_text.lower().replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_") ) diff --git a/tests/test_activity.py b/tests/test_activity.py index e3835245..03af9ab1 100644 --- a/tests/test_activity.py +++ b/tests/test_activity.py @@ -192,6 +192,22 @@ def test_activity_api_filters_accepted_work_by_query(sqlite_url: str) -> None: assert invalid_hash_query["recent"] == [] +def test_activity_rejects_control_character_search_query(sqlite_url: str) -> None: + create_schema(sqlite_url) + with session_scope(sqlite_url) as session: + ensure_genesis(session) + + client = TestClient(create_app(database_url=sqlite_url, webhook_secret="secret")) + + api_response = client.get("/api/v1/activity", params={"q": "\x00"}) + page_response = client.get("/activity", params={"q": "\x00"}) + + assert api_response.status_code == 400 + assert api_response.json() == {"detail": "q must not contain control characters"} + assert page_response.status_code == 400 + assert page_response.json() == {"detail": "q must not contain control characters"} + + def test_activity_page_renders_empty_and_paid_states(sqlite_url: str) -> None: create_schema(sqlite_url) client = TestClient(create_app(database_url=sqlite_url, webhook_secret="secret")) diff --git a/tests/test_api_mcp.py b/tests/test_api_mcp.py index 83438645..83f94fc3 100644 --- a/tests/test_api_mcp.py +++ b/tests/test_api_mcp.py @@ -540,6 +540,7 @@ def test_mcp_list_bounties_honors_sort_argument(sqlite_url: str) -> None: ({"limit": 0}, 34), ({"limit": 101}, 35), ({"sort": "invalid"}, 36), + ({"q": "\x00"}, 37), ], ) def test_mcp_list_bounties_rejects_invalid_filters( diff --git a/tests/test_bounty_api_routes.py b/tests/test_bounty_api_routes.py index 71a74415..88fc4eab 100644 --- a/tests/test_bounty_api_routes.py +++ b/tests/test_bounty_api_routes.py @@ -313,3 +313,28 @@ def test_bounty_api_limit_rejects_out_of_range_values(sqlite_url: str) -> None: assert client.get("/api/v1/bounties?limit=201").status_code == 422 assert client.get("/api/v1/bounties/summary?limit=0").status_code == 422 assert client.get("/api/v1/bounties/summary?limit=201").status_code == 422 + + +def test_bounty_api_rejects_control_character_search_queries(sqlite_url: str) -> None: + create_schema(sqlite_url) + with session_scope(sqlite_url) as session: + ensure_genesis(session) + create_bounty( + session, + repo="ramimbo/mergework", + issue_number=53, + issue_url="https://github.com/ramimbo/mergework/issues/53", + title="Control character bounty search", + reward_mrwk="5", + acceptance="Control characters should not widen bounty search results.", + ) + + client = TestClient(create_app(database_url=sqlite_url, webhook_secret="secret")) + + list_response = client.get("/api/v1/bounties?q=%00") + summary_response = client.get("/api/v1/bounties/summary?q=%00") + + assert list_response.status_code == 400 + assert list_response.json()["detail"] == "q must not contain control characters" + assert summary_response.status_code == 400 + assert summary_response.json()["detail"] == "q must not contain control characters" diff --git a/tests/test_bounty_pages.py b/tests/test_bounty_pages.py index 55b9f8bb..f1094cb6 100644 --- a/tests/test_bounty_pages.py +++ b/tests/test_bounty_pages.py @@ -282,6 +282,9 @@ def test_bounties_page_and_api_search_by_text_and_issue_number(sqlite_url: str) assert backslash_search.status_code == 200 assert [row["issue_number"] for row in backslash_search.json()] == [66] + control_char_page_search = client.get("/bounties", params={"q": "\x00"}) + assert control_char_page_search.status_code == 400 + def test_bounties_page_and_api_sort_public_rows(sqlite_url: str) -> None: create_schema(sqlite_url) From 50d4daab3b365083bafe8316c26fb4d59e504d31 Mon Sep 17 00:00:00 2001 From: xingxi0614-cpu Date: Fri, 29 May 2026 14:43:53 +0800 Subject: [PATCH 2/3] fix: reject raw control chars in search queries --- app/activity.py | 5 ++--- app/mcp_tools.py | 13 +++++++++++-- tests/test_activity.py | 6 ++++++ tests/test_api_mcp.py | 2 ++ tests/test_bounty_api_routes.py | 6 ++++++ tests/test_bounty_pages.py | 5 +++++ 6 files changed, 32 insertions(+), 5 deletions(-) diff --git a/app/activity.py b/app/activity.py index c91a9c9a..324915f7 100644 --- a/app/activity.py +++ b/app/activity.py @@ -14,10 +14,9 @@ def _normalized_activity_search_query(query: str | None) -> str | None: if query is None: return None - normalized_query = query.strip() - if any(ord(char) < 32 or ord(char) == 127 for char in normalized_query): + if any(ord(char) < 32 or ord(char) == 127 for char in query): raise HTTPException(status_code=400, detail="q must not contain control characters") - return normalized_query + return query.strip() def activity_context(session: Session, query: str | None = None) -> dict[str, Any]: diff --git a/app/mcp_tools.py b/app/mcp_tools.py index f225d4ae..fc03aa66 100644 --- a/app/mcp_tools.py +++ b/app/mcp_tools.py @@ -83,6 +83,16 @@ def optional_clean_str_arg(field: str) -> str | None: clean = value.strip() return clean or None + def optional_search_query_arg(field: str) -> str | None: + value = args.get(field) + if value is None: + return None + if not isinstance(value, str): + raise ValueError(f"{field} must be a string") + reject_control_chars(field, value) + clean = value.strip() + return clean or None + def output_format_arg() -> str: value = args.get("format", "text") if value is None: @@ -125,9 +135,8 @@ def optional_bool_arg(field: str, default: bool = False) -> bool: if normalized_status not in {"open", "paid", "closed"}: raise ValueError("status must be one of: open, paid, closed") query = select(Bounty).where(Bounty.status == normalized_status) - query_text = optional_clean_str_arg("q") + query_text = optional_search_query_arg("q") if query_text: - reject_control_chars("q", query_text) escaped_query = ( query_text.lower().replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_") ) diff --git a/tests/test_activity.py b/tests/test_activity.py index 03af9ab1..68bc12d7 100644 --- a/tests/test_activity.py +++ b/tests/test_activity.py @@ -201,11 +201,17 @@ def test_activity_rejects_control_character_search_query(sqlite_url: str) -> Non api_response = client.get("/api/v1/activity", params={"q": "\x00"}) page_response = client.get("/activity", params={"q": "\x00"}) + leading_tab_response = client.get("/api/v1/activity", params={"q": "\talice"}) + trailing_newline_response = client.get("/activity", params={"q": "alice\n"}) assert api_response.status_code == 400 assert api_response.json() == {"detail": "q must not contain control characters"} assert page_response.status_code == 400 assert page_response.json() == {"detail": "q must not contain control characters"} + assert leading_tab_response.status_code == 400 + assert leading_tab_response.json() == {"detail": "q must not contain control characters"} + assert trailing_newline_response.status_code == 400 + assert trailing_newline_response.json() == {"detail": "q must not contain control characters"} def test_activity_page_renders_empty_and_paid_states(sqlite_url: str) -> None: diff --git a/tests/test_api_mcp.py b/tests/test_api_mcp.py index 83f94fc3..6d21dde8 100644 --- a/tests/test_api_mcp.py +++ b/tests/test_api_mcp.py @@ -541,6 +541,8 @@ def test_mcp_list_bounties_honors_sort_argument(sqlite_url: str) -> None: ({"limit": 101}, 35), ({"sort": "invalid"}, 36), ({"q": "\x00"}, 37), + ({"q": "\tDocs"}, 38), + ({"q": "Docs\n"}, 39), ], ) def test_mcp_list_bounties_rejects_invalid_filters( diff --git a/tests/test_bounty_api_routes.py b/tests/test_bounty_api_routes.py index 88fc4eab..51f3885f 100644 --- a/tests/test_bounty_api_routes.py +++ b/tests/test_bounty_api_routes.py @@ -333,8 +333,14 @@ def test_bounty_api_rejects_control_character_search_queries(sqlite_url: str) -> list_response = client.get("/api/v1/bounties?q=%00") summary_response = client.get("/api/v1/bounties/summary?q=%00") + leading_tab_response = client.get("/api/v1/bounties", params={"q": "\tControl"}) + trailing_newline_response = client.get("/api/v1/bounties/summary", params={"q": "Control\n"}) assert list_response.status_code == 400 assert list_response.json()["detail"] == "q must not contain control characters" assert summary_response.status_code == 400 assert summary_response.json()["detail"] == "q must not contain control characters" + assert leading_tab_response.status_code == 400 + assert leading_tab_response.json()["detail"] == "q must not contain control characters" + assert trailing_newline_response.status_code == 400 + assert trailing_newline_response.json()["detail"] == "q must not contain control characters" diff --git a/tests/test_bounty_pages.py b/tests/test_bounty_pages.py index f1094cb6..eee8312c 100644 --- a/tests/test_bounty_pages.py +++ b/tests/test_bounty_pages.py @@ -284,6 +284,11 @@ def test_bounties_page_and_api_search_by_text_and_issue_number(sqlite_url: str) control_char_page_search = client.get("/bounties", params={"q": "\x00"}) assert control_char_page_search.status_code == 400 + assert control_char_page_search.json()["detail"] == "q must not contain control characters" + + leading_tab_page_search = client.get("/bounties", params={"q": "\tDocs"}) + assert leading_tab_page_search.status_code == 400 + assert leading_tab_page_search.json()["detail"] == "q must not contain control characters" def test_bounties_page_and_api_sort_public_rows(sqlite_url: str) -> None: From 1165d5fc624e0711761ad00a7733f3308264e26c Mon Sep 17 00:00:00 2001 From: xingxi0614-cpu Date: Fri, 29 May 2026 15:46:10 +0800 Subject: [PATCH 3/3] test: cover DEL in bounty search validation --- tests/test_bounty_api_routes.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/test_bounty_api_routes.py b/tests/test_bounty_api_routes.py index 51f3885f..47438f23 100644 --- a/tests/test_bounty_api_routes.py +++ b/tests/test_bounty_api_routes.py @@ -333,6 +333,8 @@ def test_bounty_api_rejects_control_character_search_queries(sqlite_url: str) -> list_response = client.get("/api/v1/bounties?q=%00") summary_response = client.get("/api/v1/bounties/summary?q=%00") + del_list_response = client.get("/api/v1/bounties", params={"q": "\x7f"}) + del_summary_response = client.get("/api/v1/bounties/summary", params={"q": "\x7f"}) leading_tab_response = client.get("/api/v1/bounties", params={"q": "\tControl"}) trailing_newline_response = client.get("/api/v1/bounties/summary", params={"q": "Control\n"}) @@ -340,6 +342,10 @@ def test_bounty_api_rejects_control_character_search_queries(sqlite_url: str) -> assert list_response.json()["detail"] == "q must not contain control characters" assert summary_response.status_code == 400 assert summary_response.json()["detail"] == "q must not contain control characters" + assert del_list_response.status_code == 400 + assert del_list_response.json()["detail"] == "q must not contain control characters" + assert del_summary_response.status_code == 400 + assert del_summary_response.json()["detail"] == "q must not contain control characters" assert leading_tab_response.status_code == 400 assert leading_tab_response.json()["detail"] == "q must not contain control characters" assert trailing_newline_response.status_code == 400