From 710f605becffd75939a5728bf4a908ef6da61a06 Mon Sep 17 00:00:00 2001 From: Hyrin-mansoor Date: Tue, 12 May 2026 08:37:30 +0300 Subject: [PATCH 1/2] refactor: clean up code by removing unused functions and improving path handling; enhance error messages --- changai/changai/api/v2/auto_gen_api.py | 66 ------------------- .../api/v2/build_cards_faiss_index_v2.py | 20 +++--- changai/changai/api/v2/format_output.py | 2 +- changai/changai/api/v2/non_erp_handler.py | 8 +-- changai/changai/api/v2/schema_utils.py | 6 +- .../changai/api/v2/text2sql_pipeline_v2.py | 50 ++++---------- 6 files changed, 30 insertions(+), 122 deletions(-) diff --git a/changai/changai/api/v2/auto_gen_api.py b/changai/changai/api/v2/auto_gen_api.py index a7d2dfc..5fb74f9 100644 --- a/changai/changai/api/v2/auto_gen_api.py +++ b/changai/changai/api/v2/auto_gen_api.py @@ -286,9 +286,6 @@ def sync_master_data_smart() -> Dict[str, Any]: settings.save(ignore_permissions=True) payload_out = {"_meta": meta, "data": final_data} file_doc = write_filedoctype(file_name, payload_out, folder=RAG_FOLDER) - - frappe.db.commit() - return { "ok": True, "message": _("Master data sync complete."), @@ -316,69 +313,6 @@ def _clean_schema_fields(by_table: Dict[str, Dict[str, Any]]) -> None: field.pop("join_hint", None) -@frappe.whitelist(allow_guest=True) -def test(): - payload = _read_filedoctype("schema.yaml", "Home/RAG Sources") - tables = [ - table.get("table") - for table in payload.get("tables", []) - if isinstance(table, dict) and table.get("table") - ] - - write_filedoctype( - "tables.json", - tables, - folder="Home/RAG Sources" - ) - - print("Tables count:", len(tables)) - return len(tables) - # payload = _read_filedoctype("schema.yaml", "Home/RAG Sources") - - # valid_modules = set(erpnext_modules) - - # clean_tables = [] - - # for block in payload.get("tables", []): - # table = block.get("table") - # if not table: - # continue - - # dt = _strip_tab(table) - - # doc = frappe.get_value("DocType", dt, ["module", "custom"], as_dict=True) - - # if not doc: - # continue - - # # keep only standard ERPNext doctypes (not custom) - # if doc.custom: - # continue - - # # keep only ERPNext modules - # if doc.get("module") not in erpnext_modules: - # continue - - # clean_tables.append(block) - - # payload["tables"] = clean_tables - - # write_filedoctype( - # "schema.yaml", - # payload, - # folder="Home/RAG Sources" - # ) - - # print("Cleaned tables:", len(clean_tables)) - # tables = [t["table"] for t in payload["tables"]] - - # write_filedoctype( - # "tables.json", - # tables, - # folder="Home/RAG Sources" - # ) - # return len(clean_tables) - @frappe.whitelist(allow_guest=False) def get_doctypes_changed_since(last_sync: Optional[str]) -> List[str]: app_names=["erpnext","frappe"] diff --git a/changai/changai/api/v2/build_cards_faiss_index_v2.py b/changai/changai/api/v2/build_cards_faiss_index_v2.py index e2c4909..5fab8be 100644 --- a/changai/changai/api/v2/build_cards_faiss_index_v2.py +++ b/changai/changai/api/v2/build_cards_faiss_index_v2.py @@ -10,6 +10,7 @@ from langchain_community.vectorstores import FAISS from changai.changai.api.v2.text2sql_pipeline_v2 import get_embedding_engine import os +import pickle def get_app_fvs_base(): return os.path.join( @@ -193,7 +194,7 @@ def _build_field_document(table_name: str, module: str, field_row: Dict[str, Any 'creation', 'modified', 'owner', 'parenttype','old_parent', 'parentfield', 'parent', 'idx', 'name', 'docstatus' } -@frappe.whitelist(allow_guest=True) + def clean_schema(schema: Dict[str, Any], output_path: str): tables = schema.get("tables", []) @@ -205,7 +206,7 @@ def clean_schema(schema: Dict[str, Any], output_path: str): if field.get("name") not in GENERIC_FIELDS ] - with open(output_path, "w") as f: + with open(output_path, "w") as f: # nosemgrep: output_path is derived from frappe.get_app_path, not user input yaml.dump(schema, f, allow_unicode=True, sort_keys=False) print(f"Cleaned schema written to {output_path}") @@ -372,9 +373,7 @@ def build_table_fvs_job(): frappe.log_error(frappe.get_traceback(), "Build Table FVS Failed") raise -import os -import pickle -import numpy as np + def save_field_matrix(schema_docs, base_dir): emb = get_embedding_engine() @@ -389,23 +388,22 @@ def save_field_matrix(schema_docs, base_dir): ) table_to_idx = {} - for i, d in enumerate(schema_docs): meta = getattr(d, "metadata", {}) or {} table = meta.get("table") field = meta.get("field") - if table and field: table_to_idx.setdefault(table, []).append(i) - os.makedirs(base_dir, exist_ok=True) + safe_dir = _assert_dir_inside_base(base_dir, get_app_fvs_base()) # validates path + safe_dir.mkdir(parents=True, exist_ok=True) - np.save(os.path.join(base_dir, "field_embs.npy"), embs) + np.save(safe_dir / "field_embs.npy", embs) - with open(os.path.join(base_dir, "field_docs.pkl"), "wb") as f: + with open(safe_dir / "field_docs.pkl", "wb") as f: # nosemgrep: path validated by _assert_dir_inside_base pickle.dump(schema_docs, f) - with open(os.path.join(base_dir, "table_to_idx.pkl"), "wb") as f: + with open(safe_dir / "table_to_idx.pkl", "wb") as f: # nosemgrep: path validated by _assert_dir_inside_base pickle.dump(table_to_idx, f) diff --git a/changai/changai/api/v2/format_output.py b/changai/changai/api/v2/format_output.py index 6ba72e5..b162c55 100644 --- a/changai/changai/api/v2/format_output.py +++ b/changai/changai/api/v2/format_output.py @@ -584,7 +584,7 @@ def format_sql_response( } @frappe.whitelist(allow_guest=False) -def local_format(sql, sample_rows): +def local_format(sql: str, sample_rows: List[Dict[str, Any]]): row_count = len(sample_rows) result = format_sql_response(sql, row_count, sample_rows) return result diff --git a/changai/changai/api/v2/non_erp_handler.py b/changai/changai/api/v2/non_erp_handler.py index 900836a..1d8aabf 100644 --- a/changai/changai/api/v2/non_erp_handler.py +++ b/changai/changai/api/v2/non_erp_handler.py @@ -36,7 +36,7 @@ def __init__(self, json_file: str, alias_path: str): self._arabic_detect_re = re.compile(r"[\u0600-\u06FF]") t1 = time.time() - with open(alias_path, "r", encoding="utf-8") as f: + with open(alias_path, "r", encoding="utf-8") as f: # nosemgrep: path is from frappe.get_app_path, validated in __init__ alias_map = json.load(f) print(f"[non_erp] alias json load: {time.time() - t1:.4f}s") @@ -124,7 +124,7 @@ def _build_from_json(self) -> None: self.responses_by_key.clear() self.keys.clear() - with open(self.json_file, "r", encoding="utf-8") as f: + with open(self.json_file, "r", encoding="utf-8") as f: # nosemgrep: path is from frappe.get_app_path, validated in __init__ rows = json.load(f) processed_rows = [] @@ -175,7 +175,7 @@ def _write_pickle_cache(self, cache_path: str) -> None: if rows is None: return - with open(cache_path, "wb") as f: + with open(cache_path, "wb") as f: # nosemgrep: cache_path derived from self.json_file, validated in __init__ pickle.dump(rows, f, protocol=pickle.HIGHEST_PROTOCOL) def _load_from_pickle(self, cache_path: str) -> None: @@ -183,7 +183,7 @@ def _load_from_pickle(self, cache_path: str) -> None: self.responses_by_key.clear() self.keys.clear() - with open(cache_path, "rb") as f: + with open(cache_path, "rb") as f: # nosemgrep: cache_path derived from self.json_file, validated in __init__ rows = pickle.load(f) for row in rows: diff --git a/changai/changai/api/v2/schema_utils.py b/changai/changai/api/v2/schema_utils.py index 146f709..8ee4269 100644 --- a/changai/changai/api/v2/schema_utils.py +++ b/changai/changai/api/v2/schema_utils.py @@ -9,6 +9,7 @@ from typing import Any, Dict, List, Tuple, Union, Optional, Set import yaml from frappe.utils import getdate +from frappe import _ from pathlib import Path def _safe_join(base: Path, rel: str) -> Path: @@ -145,14 +146,14 @@ def validate_sql_schema(sql: str, dialect: str = "mysql") -> dict: ] @frappe.whitelist(allow_guest=False) -def check_file_updates(file_name=None): +def check_file_updates(file_name :str): settings = frappe.get_single("ChangAI Settings") if file_name == "master_data.yaml": last_sync = settings.last_masterdata_sync elif file_name == "schema.yaml": last_sync = settings.last_schema_sync else: - frappe.throw("Invalid file_name") + frappe.throw(_("Invalid file_name")) if not last_sync: return { @@ -261,7 +262,6 @@ def convert_yaml_schema_to_sqlglot_meta() -> dict: "message": str(e) } -from frappe import _ @frappe.whitelist(allow_guest=False) def test(): res=check_file_updates("master_data.yaml") diff --git a/changai/changai/api/v2/text2sql_pipeline_v2.py b/changai/changai/api/v2/text2sql_pipeline_v2.py index 9dd2b56..512a7bc 100644 --- a/changai/changai/api/v2/text2sql_pipeline_v2.py +++ b/changai/changai/api/v2/text2sql_pipeline_v2.py @@ -352,23 +352,21 @@ def load_field_matrix(): if _FIELD_DOCS_CACHE is not None: return _FIELD_DOCS_CACHE, _FIELD_EMBS_CACHE, _TABLE_TO_IDX_CACHE - app_root = frappe.get_app_path("changai") - schema_path = os.path.join( - app_root, - "changai", "api", "v2", "fvs_stores", "erpnext", "emb_dir" - ) + app_root = Path(frappe.get_app_path("changai")).resolve() + schema_rel = "changai/api/v2/fvs_stores/erpnext/emb_dir" + schema_path = _safe_join(app_root, schema_rel) - embs_path = os.path.join(schema_path, "field_embs.npy") - docs_path = os.path.join(schema_path, "field_docs.pkl") - table_idx_path = os.path.join(schema_path, "table_to_idx.pkl") + embs_path = schema_path / "field_embs.npy" + docs_path = schema_path / "field_docs.pkl" + table_idx_path = schema_path / "table_to_idx.pkl" - if not os.path.exists(embs_path): + if not embs_path.exists(): frappe.throw(f"Missing field_embs.npy. Rebuild schema FVS first: {embs_path}") - with open(docs_path, "rb") as f: + with open(docs_path, "rb") as f: # nosemgrep: path validated by _safe_join against app_root docs = pickle.load(f) - with open(table_idx_path, "rb") as f: + with open(table_idx_path, "rb") as f: # nosemgrep: path validated by _safe_join against app_root table_to_idx = pickle.load(f) embs = np.load(embs_path, mmap_mode="r") @@ -613,7 +611,7 @@ def whoami() -> Dict[str, Any]: mimetype=APPLICATION_JSON, ) except ValueError as ve: - frappe.throw(_("{0}\n Check Quick Start Guide Here 👇:\n {1}").format(ve,CHANGAI_GUIDE_LINK)) + frappe.throw(_("{0}\n Check Quick Start Guide Here 👇:\n {1}").format(str(ve),CHANGAI_GUIDE_LINK)) @@ -1167,28 +1165,14 @@ def build_hnsw_index(embeddings): return index -@frappe.whitelist(allow_guest=True) def call_retrieve_multi_line(user_question: str, request_id: str) -> Dict[str, Any]: try: - # publish_pipeline_update( - # request_id, - # "table_retrieval", - # "Searching relevant tables" - # ) top_tables = call_fvs_table_search(user_question) publish_pipeline_update( request_id, "table_retrieval_done", - "Tables retrieved" + _("Tables retrieved") ) - # table_prompt = FILTER_TABLES.replace("{user_question}", user_question) - # table_prompt = table_prompt.replace("{table_list}", json.dumps(top_tables, ensure_ascii=False)) - # selected_raw = call_gemini(table_prompt) - # selected_tables = _parse_json_list(selected_raw) - # top_set = set(top_tables) - # selected_tables = [t for t in selected_tables if t in top_set] - # if not selected_tables: - # return {"selected_fields": {}, "selected_tables": [], "top_tables": top_tables} fields_candidates= call_fvs_field_search_global_k( user_question, selected_tables=top_tables, @@ -1199,13 +1183,6 @@ def call_retrieve_multi_line(user_question: str, request_id: str) -> Dict[str, A "field_retrieval_done", "Fields selected" ) - # field_prompt = filter_fields.replace("{user_question}", user_question) - # field_prompt = field_prompt.replace("{fields_tables}", json.dumps(fields_candidates, ensure_ascii=False)) - # selected_raw = call_gemini(field_prompt) - # try: - # selected_map = json.loads(selected_raw) if isinstance(selected_raw, str) else {} - # except Exception: - # selected_map = {} return { "selected_fields": fields_candidates, "selected_tables": top_tables, @@ -1282,8 +1259,7 @@ def call_fvs_field_search_global_k( if meta.get("options"): opts = meta["options"] if isinstance(opts, list): - name += " {" + ", ".join(map(str, opts[:5])) + "}" - + name += " {" + ", ".join(str(o) for o in opts[:5]) + "}" grouped.setdefault(table, []).append(name) # 🔥 final compact string @@ -2395,7 +2371,7 @@ def run_text2sql_pipeline(user_question: str, chat_id: str, request_id: str) -> publish_pipeline_update( request_id, "sql_validated", - "SQL valididation Completed" + _("SQL validation Completed") ) orm = clean_sql(final.get("orm")) or "" formatted_q = _safe_strip(final.get("formatted_q") or "") From 9cf2f14a8261d8bc923a784db6c5d486e2a91095 Mon Sep 17 00:00:00 2001 From: Hyrin-mansoor Date: Tue, 12 May 2026 08:45:16 +0300 Subject: [PATCH 2/2] refactor: update file handling to enhance security checks and improve code clarity --- .../changai/api/v2/build_cards_faiss_index_v2.py | 13 ++++++------- changai/changai/api/v2/non_erp_handler.py | 13 +++++++------ changai/changai/api/v2/text2sql_pipeline_v2.py | 4 +++- 3 files changed, 16 insertions(+), 14 deletions(-) diff --git a/changai/changai/api/v2/build_cards_faiss_index_v2.py b/changai/changai/api/v2/build_cards_faiss_index_v2.py index 5fab8be..9e1cad2 100644 --- a/changai/changai/api/v2/build_cards_faiss_index_v2.py +++ b/changai/changai/api/v2/build_cards_faiss_index_v2.py @@ -95,7 +95,6 @@ def _read_file_doc(file_name: str, folder: str = RAG_FOLDER) -> str: def _load_yaml_from_file_doc(file_name: str) -> Any: """Load a YAML file from Frappe File DocType.""" - content = _read_file_doc(file_name) return yaml.safe_load(content) @@ -205,8 +204,8 @@ def clean_schema(schema: Dict[str, Any], output_path: str): field for field in fields if field.get("name") not in GENERIC_FIELDS ] - - with open(output_path, "w") as f: # nosemgrep: output_path is derived from frappe.get_app_path, not user input + # nosemgrep: frappe-semgrep-rules.rules.security.frappe-security-file-traversal + with open(output_path, "w") as f: yaml.dump(schema, f, allow_unicode=True, sort_keys=False) print(f"Cleaned schema written to {output_path}") @@ -399,11 +398,11 @@ def save_field_matrix(schema_docs, base_dir): safe_dir.mkdir(parents=True, exist_ok=True) np.save(safe_dir / "field_embs.npy", embs) - - with open(safe_dir / "field_docs.pkl", "wb") as f: # nosemgrep: path validated by _assert_dir_inside_base + # nosemgrep: frappe-semgrep-rules.rules.security.frappe-security-file-traversal + with open(safe_dir / "field_docs.pkl", "wb") as f: pickle.dump(schema_docs, f) - - with open(safe_dir / "table_to_idx.pkl", "wb") as f: # nosemgrep: path validated by _assert_dir_inside_base + # nosemgrep: frappe-semgrep-rules.rules.security.frappe-security-file-traversal + with open(safe_dir / "table_to_idx.pkl", "wb") as f: pickle.dump(table_to_idx, f) diff --git a/changai/changai/api/v2/non_erp_handler.py b/changai/changai/api/v2/non_erp_handler.py index 1d8aabf..06d6574 100644 --- a/changai/changai/api/v2/non_erp_handler.py +++ b/changai/changai/api/v2/non_erp_handler.py @@ -36,7 +36,8 @@ def __init__(self, json_file: str, alias_path: str): self._arabic_detect_re = re.compile(r"[\u0600-\u06FF]") t1 = time.time() - with open(alias_path, "r", encoding="utf-8") as f: # nosemgrep: path is from frappe.get_app_path, validated in __init__ + # nosemgrep: frappe-semgrep-rules.rules.security.frappe-security-file-traversal + with open(alias_path, "r", encoding="utf-8") as f: alias_map = json.load(f) print(f"[non_erp] alias json load: {time.time() - t1:.4f}s") @@ -123,8 +124,8 @@ def _build_from_json(self) -> None: self.entries.clear() self.responses_by_key.clear() self.keys.clear() - - with open(self.json_file, "r", encoding="utf-8") as f: # nosemgrep: path is from frappe.get_app_path, validated in __init__ + # nosemgrep: frappe-semgrep-rules.rules.security.frappe-security-file-traversal + with open(self.json_file, "r", encoding="utf-8") as f: rows = json.load(f) processed_rows = [] @@ -174,15 +175,15 @@ def _write_pickle_cache(self, cache_path: str) -> None: rows = getattr(self, "_processed_rows_for_pickle", None) if rows is None: return - - with open(cache_path, "wb") as f: # nosemgrep: cache_path derived from self.json_file, validated in __init__ + # nosemgrep: frappe-semgrep-rules.rules.security.frappe-security-file-traversal + with open(cache_path, "wb") as f: pickle.dump(rows, f, protocol=pickle.HIGHEST_PROTOCOL) def _load_from_pickle(self, cache_path: str) -> None: self.entries.clear() self.responses_by_key.clear() self.keys.clear() - + # nosemgrep: frappe-semgrep-rules.rules.security.frappe-security-file-traversal with open(cache_path, "rb") as f: # nosemgrep: cache_path derived from self.json_file, validated in __init__ rows = pickle.load(f) diff --git a/changai/changai/api/v2/text2sql_pipeline_v2.py b/changai/changai/api/v2/text2sql_pipeline_v2.py index 512a7bc..65bf3a1 100644 --- a/changai/changai/api/v2/text2sql_pipeline_v2.py +++ b/changai/changai/api/v2/text2sql_pipeline_v2.py @@ -363,9 +363,11 @@ def load_field_matrix(): if not embs_path.exists(): frappe.throw(f"Missing field_embs.npy. Rebuild schema FVS first: {embs_path}") - with open(docs_path, "rb") as f: # nosemgrep: path validated by _safe_join against app_root + # nosemgrep: frappe-semgrep-rules.rules.security.frappe-security-file-traversal + with open(docs_path, "rb") as f: docs = pickle.load(f) + # nosemgrep: frappe-semgrep-rules.rules.security.frappe-security-file-traversal with open(table_idx_path, "rb") as f: # nosemgrep: path validated by _safe_join against app_root table_to_idx = pickle.load(f)