From 184fad701783fe6146c899de26eb2888e038af59 Mon Sep 17 00:00:00 2001 From: moon <152454724+pabloDarkmoon24@users.noreply.github.com> Date: Mon, 23 Mar 2026 15:40:02 -0500 Subject: [PATCH 1/4] fix: solution for issue #76 --- fix_issue_76.py | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 fix_issue_76.py diff --git a/fix_issue_76.py b/fix_issue_76.py new file mode 100644 index 00000000..dcb6a83c --- /dev/null +++ b/fix_issue_76.py @@ -0,0 +1,3 @@ +```json +{ + "solution_code": "### FILE: app/routes/privacy.py\n\n```python\nimport io\nimport json\nimport zipfile\nimport logging\nfrom datetime import datetime, timezone\nfrom flask import Blueprint, jsonify, request, send_file, current_app, g\nfrom functools import wraps\n\nfrom app.db.connection import get_db\nfrom app.auth.middleware import jwt_required\nfrom app.audit import log_audit_event\n\nprivacy_bp = Blueprint('privacy', __name__, url_prefix='/users')\nlogger = logging.getLogger(__name__)\n\n\n@privacy_bp.route('/export', methods=['GET'])\n@jwt_required\ndef export_user_data():\n \"\"\"\n Generate a ZIP archive containing all personal data for the\n authenticated user. The archive contains one JSON file per\n data domain (profile, expenses, bills, reminders, audit_logs).\n \"\"\"\n user_id = g.current_user['user_id']\n db = get_db()\n\n try:\n data = _collect_user_data(db, user_id)\n except Exception as exc:\n logger.error('export failed for user %s: %s', user_id, exc)\n return jsonify({'error': 'export_failed', 'message': str(exc)}), 500\n\n # --- build in-memory ZIP ---\n zip_buffer = io.BytesIO()\n with zipfile.ZipFile(zip_buffer, mode='w', compression=zipfile.ZIP_DEFLATED) as zf:\n for domain, records in data.items():\n zf.writestr(\n f'{domain}.json',\n json.dumps(records, indent=2, default=str)\n )\n # manifest\n manifest = {\n 'user_id': user_id,\n 'exported_at': datetime.now(timezone.utc).isoformat(),\n 'domains': list(data.keys()),\n }\n zf.writestr('manifest.json', json.dumps(manifest, indent=2))\n\n zip_buffer.seek(0)\n\n log_audit_event(\n db,\n user_id=user_id,\n action='data_export',\n detail='User requested full PII export package',\n ip_address=request.remote_addr,\n )\n\n filename = f'finmind_export_{user_id}_{datetime.now(timezone.utc).strftime(\"%Y%m%dT%H%M%SZ\")}.zip'\n return send_file(\n zip_buffer,\n mimetype='application/zip',\n as_attachment=True,\n download_name=filename,\n )\n\n\n@privacy_bp.route('/delete', methods=['DELETE'])\n@jwt_required\ndef delete_user_account():\n \"\"\"\n Permanently and irreversibly delete all data belonging to the\n authenticated user.\n\n The caller must supply their current password in the JSON body\n as a second-factor confirmation:\n { \"password\": \"\", \"confirm\": \"DELETE MY ACCOUNT\" }\n\n Deletion is hard-delete with cascade. An audit record is\n written to a *separate* audit_logs_archive table before the\n user row is removed so there is an immutable paper-trail.\n \"\"\"\n user_id = g.current_user['user_id']\n db = get_db()\n\n body = request.get_json(silent=True) or {}\n password = body.get('password', '').strip()\n confirm_phrase = body.get('confirm', '').strip()\n\n if confirm_phrase != 'DELETE MY ACCOUNT':\n return jsonify({\n 'error': 'confirmation_required',\n 'message': 'Send {\"confirm\": \"DELETE MY ACCOUNT\"} to proceed.',\n }), 400\n\n # --- verify password ---\n if not _verify_password(db, user_id, password):\n log_audit_event(\n db,\n user_id=user_id,\n action='delete_account_failed',\n detail='Wrong password supplied during account-deletion attempt',\n ip_address=request.remote_addr,\n )\n return jsonify({'error': 'invalid_credentials', 'message': 'Password mismatch.'}), 403\n\n try:\n _hard_delete_user(db, user_id, request.remote_addr)\n except Exception as exc:\n logger.error('deletion failed for user %s: %s', user_id, exc)\n db.rollback()\n return jsonify({'error': 'deletion_failed', 'message': str(exc)}), 500\n\n return jsonify({\n 'message': 'Account and all associated data have been permanently deleted.',\n 'deleted_at': datetime.now(timezone.utc).isoformat(),\n }), 200\n\n\n# ---------------------------------------------------------------------------\n# helpers\n# ---------------------------------------------------------------------------\n\ndef _collect_user_data(db, user_id: int) -> dict:\n \"\"\"Return all PII-bearing rows for *user_id* as plain dicts.\"\"\"\n cur = db.cursor()\n result = {}\n\n queries = {\n 'profile': 'SELECT * FROM users WHERE id = %s',\n 'categories': 'SELECT * FROM categories WHERE user_id = %s',\n 'expenses': 'SELECT * FROM expenses WHERE user_id = %s',\n 'bills': 'SELECT * FROM bills WHERE user_id = %s',\n 'reminders': 'SELECT * FROM reminders WHERE user_id = %s',\n 'subscriptions': '''\n SELECT us.*, sp.name AS plan_name, sp.price\n FROM user_subscriptions us\n JOIN subscription_plans sp ON sp.id = us.plan_id\n WHERE us.user_id = %s\n ''',\n 'audit_logs': 'SELECT * FROM audit_logs WHERE user_id = %s ORDER BY created_at DESC',\n }\n\n for domain, sql in queries.items():\n try:\n cur.execute(sql, (user_id,))\n cols = [desc[0] for desc in cur.description]\n result[domain] = [\n dict(zip(cols, row)) for row in cur.fetchall()\n ]\n except Exception as exc:\n logger.warning('could not fetch %s for user %s: %s', domain, user_id, exc)\n result[domain] = []\n\n cur.close()\n return result\n\n\ndef _verify_password(db, user_id: int, plain_password: str) -> bool:\n \"\"\"Return True if *plain_password* matches the stored hash.\"\"\"\n import bcrypt\n cur = db.cursor()\n cur.execute('SELECT password_hash FROM users WHERE id = %s', (user_id,))\n row = cur.fetchone()\n cur.close()\n if not row:\n return False\n stored_hash = row[0]\n if isinstance(stored_hash, str):\n stored_hash = stored_hash.encode()\n return bcrypt.checkpw(plain_password.encode(), stored_hash)\n\n\ndef _hard_delete_user(db, user_id: int, ip_address: str) -> None:\n \"\"\"\n Write an immutable archive record, then cascade-delete every row\n that belongs to *user_id*. All statements run inside one transaction.\n \"\"\"\n cur = db.cursor()\n\n # 1. Snapshot user row into the archive table (survives the CASCADE)\n cur.execute(\n '''\n INSERT INTO audit_logs_archive\n (user_id, action, detail, ip_address, created_at)\n VALUES (%s, %s, %s, %s, NOW())\n ''',\n (\n user_id,\n 'account_deleted',\n json.dumps({\n 'reason': 'user_requested_gdpr_deletion',\n 'ip': ip_address,\n 'timestamp': datetime.now(timezone.utc).isoformat(),\n }),\n ip_address,\n ),\n )\n\n # 2. Delete child rows in safe order (FK constraints)\n # Adjust table names to match your actual schema.\n child_tables = [\n 'refresh_tokens',\n 'ad_impressions',\n 'user_subscriptions',\n 'reminders',\n 'bills',\n 'expenses',\n 'categories',\n 'audit_logs',\n ]\n for table in child_tables:\n cur.execute(f'DELETE FROM {table} WHERE user_id = %s', (user_id,))\n\n # 3. Delete the user row itself\n cur.execute('DELETE FROM users WHERE id = %s', (user_id,))\n\n db.commit()\n cur.close()\n logger.info('GDPR hard-delete completed for user_id=%s from ip=%s', user_id, ip_address)\n```\n\n---\n\n### FILE: app/audit.py\n\n```python\nimport logging\nfrom datetime import datetime, timezone\n\nlogger = logging.getLogger(__name__)\n\n\ndef log_audit_event(\n db,\n *,\n user_id: int,\n action: str,\n detail: str = '',\n ip_address: str = '',\n) -> None:\n \"\"\"\n Append a row to the audit_logs table. Failures are logged but\n never propagate – audit logging must never break the main request.\n \"\"\"\n try:\n cur = db.cursor()\n cur.execute(\n '''\n INSERT INTO audit_logs (user_id, action, detail, ip_address, created_at)\n VALUES (%s, %s, %s, %s, %s)\n ''',\n (user_id, action, detail, ip_address, datetime.now(timezone.utc)),\n )\n db.commit()\n cur.close()\n except Exception as exc:\n logger.error('audit log write failed: %s', exc)\n```\n\n---\n\n### FILE: app/db/migrations/003_add_audit_tables.sql\n\n```sql\n-- Run once to add audit_logs and audit_logs_archive tables.\n-- Safe to run on a DB that already has audit_logs (uses IF NOT EXISTS).\n\nCREATE TABLE IF NOT EXISTS audit_logs (\n id BIGSERIAL PRIMARY KEY,\n user_id INTEGER NOT NULL,\n action VARCHAR(128) NOT NULL,\n detail TEXT DEFAULT '',\n ip_address VARCHAR(45) DEFAULT '',\n created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()\n);\n\nCREATE INDEX IF NOT EXISTS idx_audit_logs_user_id ON audit_logs (user_id);\nCREATE INDEX IF NOT EXISTS idx_audit_logs_action ON audit_logs (action);\nCREATE INDEX IF NOT EXISTS idx_audit_logs_created ON audit_logs (created_at);\n\n-- Immutable archive: rows survive after a user is hard-deleted.\n-- No FK to users so the record is permanently retained.\nCREATE TABLE IF NOT EXISTS audit_logs_archive (\n id BIGSERIAL PRIMARY KEY \ No newline at end of file From be7ab00e9ad6e052c6612c6556ace0ceabd4f57d Mon Sep 17 00:00:00 2001 From: moon <152454724+pabloDarkmoon24@users.noreply.github.com> Date: Mon, 23 Mar 2026 15:41:03 -0500 Subject: [PATCH 2/4] fix: solution for issue #76 --- fix_issue_76.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fix_issue_76.py b/fix_issue_76.py index dcb6a83c..1d25d7fe 100644 --- a/fix_issue_76.py +++ b/fix_issue_76.py @@ -1,3 +1,3 @@ ```json { - "solution_code": "### FILE: app/routes/privacy.py\n\n```python\nimport io\nimport json\nimport zipfile\nimport logging\nfrom datetime import datetime, timezone\nfrom flask import Blueprint, jsonify, request, send_file, current_app, g\nfrom functools import wraps\n\nfrom app.db.connection import get_db\nfrom app.auth.middleware import jwt_required\nfrom app.audit import log_audit_event\n\nprivacy_bp = Blueprint('privacy', __name__, url_prefix='/users')\nlogger = logging.getLogger(__name__)\n\n\n@privacy_bp.route('/export', methods=['GET'])\n@jwt_required\ndef export_user_data():\n \"\"\"\n Generate a ZIP archive containing all personal data for the\n authenticated user. The archive contains one JSON file per\n data domain (profile, expenses, bills, reminders, audit_logs).\n \"\"\"\n user_id = g.current_user['user_id']\n db = get_db()\n\n try:\n data = _collect_user_data(db, user_id)\n except Exception as exc:\n logger.error('export failed for user %s: %s', user_id, exc)\n return jsonify({'error': 'export_failed', 'message': str(exc)}), 500\n\n # --- build in-memory ZIP ---\n zip_buffer = io.BytesIO()\n with zipfile.ZipFile(zip_buffer, mode='w', compression=zipfile.ZIP_DEFLATED) as zf:\n for domain, records in data.items():\n zf.writestr(\n f'{domain}.json',\n json.dumps(records, indent=2, default=str)\n )\n # manifest\n manifest = {\n 'user_id': user_id,\n 'exported_at': datetime.now(timezone.utc).isoformat(),\n 'domains': list(data.keys()),\n }\n zf.writestr('manifest.json', json.dumps(manifest, indent=2))\n\n zip_buffer.seek(0)\n\n log_audit_event(\n db,\n user_id=user_id,\n action='data_export',\n detail='User requested full PII export package',\n ip_address=request.remote_addr,\n )\n\n filename = f'finmind_export_{user_id}_{datetime.now(timezone.utc).strftime(\"%Y%m%dT%H%M%SZ\")}.zip'\n return send_file(\n zip_buffer,\n mimetype='application/zip',\n as_attachment=True,\n download_name=filename,\n )\n\n\n@privacy_bp.route('/delete', methods=['DELETE'])\n@jwt_required\ndef delete_user_account():\n \"\"\"\n Permanently and irreversibly delete all data belonging to the\n authenticated user.\n\n The caller must supply their current password in the JSON body\n as a second-factor confirmation:\n { \"password\": \"\", \"confirm\": \"DELETE MY ACCOUNT\" }\n\n Deletion is hard-delete with cascade. An audit record is\n written to a *separate* audit_logs_archive table before the\n user row is removed so there is an immutable paper-trail.\n \"\"\"\n user_id = g.current_user['user_id']\n db = get_db()\n\n body = request.get_json(silent=True) or {}\n password = body.get('password', '').strip()\n confirm_phrase = body.get('confirm', '').strip()\n\n if confirm_phrase != 'DELETE MY ACCOUNT':\n return jsonify({\n 'error': 'confirmation_required',\n 'message': 'Send {\"confirm\": \"DELETE MY ACCOUNT\"} to proceed.',\n }), 400\n\n # --- verify password ---\n if not _verify_password(db, user_id, password):\n log_audit_event(\n db,\n user_id=user_id,\n action='delete_account_failed',\n detail='Wrong password supplied during account-deletion attempt',\n ip_address=request.remote_addr,\n )\n return jsonify({'error': 'invalid_credentials', 'message': 'Password mismatch.'}), 403\n\n try:\n _hard_delete_user(db, user_id, request.remote_addr)\n except Exception as exc:\n logger.error('deletion failed for user %s: %s', user_id, exc)\n db.rollback()\n return jsonify({'error': 'deletion_failed', 'message': str(exc)}), 500\n\n return jsonify({\n 'message': 'Account and all associated data have been permanently deleted.',\n 'deleted_at': datetime.now(timezone.utc).isoformat(),\n }), 200\n\n\n# ---------------------------------------------------------------------------\n# helpers\n# ---------------------------------------------------------------------------\n\ndef _collect_user_data(db, user_id: int) -> dict:\n \"\"\"Return all PII-bearing rows for *user_id* as plain dicts.\"\"\"\n cur = db.cursor()\n result = {}\n\n queries = {\n 'profile': 'SELECT * FROM users WHERE id = %s',\n 'categories': 'SELECT * FROM categories WHERE user_id = %s',\n 'expenses': 'SELECT * FROM expenses WHERE user_id = %s',\n 'bills': 'SELECT * FROM bills WHERE user_id = %s',\n 'reminders': 'SELECT * FROM reminders WHERE user_id = %s',\n 'subscriptions': '''\n SELECT us.*, sp.name AS plan_name, sp.price\n FROM user_subscriptions us\n JOIN subscription_plans sp ON sp.id = us.plan_id\n WHERE us.user_id = %s\n ''',\n 'audit_logs': 'SELECT * FROM audit_logs WHERE user_id = %s ORDER BY created_at DESC',\n }\n\n for domain, sql in queries.items():\n try:\n cur.execute(sql, (user_id,))\n cols = [desc[0] for desc in cur.description]\n result[domain] = [\n dict(zip(cols, row)) for row in cur.fetchall()\n ]\n except Exception as exc:\n logger.warning('could not fetch %s for user %s: %s', domain, user_id, exc)\n result[domain] = []\n\n cur.close()\n return result\n\n\ndef _verify_password(db, user_id: int, plain_password: str) -> bool:\n \"\"\"Return True if *plain_password* matches the stored hash.\"\"\"\n import bcrypt\n cur = db.cursor()\n cur.execute('SELECT password_hash FROM users WHERE id = %s', (user_id,))\n row = cur.fetchone()\n cur.close()\n if not row:\n return False\n stored_hash = row[0]\n if isinstance(stored_hash, str):\n stored_hash = stored_hash.encode()\n return bcrypt.checkpw(plain_password.encode(), stored_hash)\n\n\ndef _hard_delete_user(db, user_id: int, ip_address: str) -> None:\n \"\"\"\n Write an immutable archive record, then cascade-delete every row\n that belongs to *user_id*. All statements run inside one transaction.\n \"\"\"\n cur = db.cursor()\n\n # 1. Snapshot user row into the archive table (survives the CASCADE)\n cur.execute(\n '''\n INSERT INTO audit_logs_archive\n (user_id, action, detail, ip_address, created_at)\n VALUES (%s, %s, %s, %s, NOW())\n ''',\n (\n user_id,\n 'account_deleted',\n json.dumps({\n 'reason': 'user_requested_gdpr_deletion',\n 'ip': ip_address,\n 'timestamp': datetime.now(timezone.utc).isoformat(),\n }),\n ip_address,\n ),\n )\n\n # 2. Delete child rows in safe order (FK constraints)\n # Adjust table names to match your actual schema.\n child_tables = [\n 'refresh_tokens',\n 'ad_impressions',\n 'user_subscriptions',\n 'reminders',\n 'bills',\n 'expenses',\n 'categories',\n 'audit_logs',\n ]\n for table in child_tables:\n cur.execute(f'DELETE FROM {table} WHERE user_id = %s', (user_id,))\n\n # 3. Delete the user row itself\n cur.execute('DELETE FROM users WHERE id = %s', (user_id,))\n\n db.commit()\n cur.close()\n logger.info('GDPR hard-delete completed for user_id=%s from ip=%s', user_id, ip_address)\n```\n\n---\n\n### FILE: app/audit.py\n\n```python\nimport logging\nfrom datetime import datetime, timezone\n\nlogger = logging.getLogger(__name__)\n\n\ndef log_audit_event(\n db,\n *,\n user_id: int,\n action: str,\n detail: str = '',\n ip_address: str = '',\n) -> None:\n \"\"\"\n Append a row to the audit_logs table. Failures are logged but\n never propagate – audit logging must never break the main request.\n \"\"\"\n try:\n cur = db.cursor()\n cur.execute(\n '''\n INSERT INTO audit_logs (user_id, action, detail, ip_address, created_at)\n VALUES (%s, %s, %s, %s, %s)\n ''',\n (user_id, action, detail, ip_address, datetime.now(timezone.utc)),\n )\n db.commit()\n cur.close()\n except Exception as exc:\n logger.error('audit log write failed: %s', exc)\n```\n\n---\n\n### FILE: app/db/migrations/003_add_audit_tables.sql\n\n```sql\n-- Run once to add audit_logs and audit_logs_archive tables.\n-- Safe to run on a DB that already has audit_logs (uses IF NOT EXISTS).\n\nCREATE TABLE IF NOT EXISTS audit_logs (\n id BIGSERIAL PRIMARY KEY,\n user_id INTEGER NOT NULL,\n action VARCHAR(128) NOT NULL,\n detail TEXT DEFAULT '',\n ip_address VARCHAR(45) DEFAULT '',\n created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()\n);\n\nCREATE INDEX IF NOT EXISTS idx_audit_logs_user_id ON audit_logs (user_id);\nCREATE INDEX IF NOT EXISTS idx_audit_logs_action ON audit_logs (action);\nCREATE INDEX IF NOT EXISTS idx_audit_logs_created ON audit_logs (created_at);\n\n-- Immutable archive: rows survive after a user is hard-deleted.\n-- No FK to users so the record is permanently retained.\nCREATE TABLE IF NOT EXISTS audit_logs_archive (\n id BIGSERIAL PRIMARY KEY \ No newline at end of file + "solution_code": "### FILE: app/blueprints/privacy.py\n\n```python\nimport io\nimport json\nimport zipfile\nimport logging\nfrom datetime import datetime, timezone\nfrom functools import wraps\n\nfrom flask import Blueprint, jsonify, request, g, current_app, send_file\nfrom sqlalchemy import text\n\nfrom app.extensions import db\nfrom app.utils.auth import jwt_required, get_current_user_id\nfrom app.utils.cache import invalidate_user_cache\nfrom app.utils.email import send_email\n\nlogger = logging.getLogger(__name__)\n\nprivacy_bp = Blueprint('privacy', __name__, url_prefix='/users')\n\n\ndef require_self_or_admin(f):\n \"\"\"Decorator: user can only access their own data unless admin.\"\"\"\n @wraps(f)\n def decorated(user_id, *args, **kwargs):\n current_id = get_current_user_id()\n is_admin = getattr(g, 'is_admin', False)\n if str(current_id) != str(user_id) and not is_admin:\n return jsonify({'error': 'Forbidden'}), 403\n return f(user_id, *args, **kwargs)\n return decorated\n\n\ndef write_audit_log(user_id: int, action: str, metadata: dict = None, performed_by: int = None):\n \"\"\"Write an immutable audit log entry.\"\"\"\n try:\n db.session.execute(\n text(\"\"\"\n INSERT INTO audit_logs (user_id, action, metadata, performed_by, created_at)\n VALUES (:user_id, :action, :metadata, :performed_by, :created_at)\n \"\"\"),\n {\n 'user_id': user_id,\n 'action': action,\n 'metadata': json.dumps(metadata or {}),\n 'performed_by': performed_by or user_id,\n 'created_at': datetime.now(timezone.utc),\n }\n )\n db.session.commit()\n except Exception as exc:\n logger.error('audit_log write failed: %s', exc)\n db.session.rollback()\n\n\ndef _fetch_user_data(user_id: int) -> dict:\n \"\"\"\n Collect all PII-bearing rows for a user across every table.\n Returns a plain dict ready for JSON serialisation.\n \"\"\"\n def rows(sql, **params):\n result = db.session.execute(text(sql), {'uid': user_id, **params})\n cols = result.keys()\n return [dict(zip(cols, row)) for row in result.fetchall()]\n\n # Serialise datetime / date objects so json.dumps works later\n def serial(obj):\n if isinstance(obj, (datetime,)):\n return obj.isoformat()\n return str(obj)\n\n def clean(records):\n return [\n {k: serial(v) if not isinstance(v, (str, int, float, bool, type(None))) else v\n for k, v in r.items()}\n for r in records\n ]\n\n return {\n 'exported_at': datetime.now(timezone.utc).isoformat(),\n 'user': clean(rows('SELECT id, email, username, created_at, updated_at FROM users WHERE id = :uid')),\n 'categories': clean(rows('SELECT * FROM categories WHERE user_id = :uid')),\n 'expenses': clean(rows('SELECT * FROM expenses WHERE user_id = :uid ORDER BY date DESC')),\n 'bills': clean(rows('SELECT * FROM bills WHERE user_id = :uid ORDER BY due_date DESC')),\n 'reminders': clean(rows('SELECT * FROM reminders WHERE user_id = :uid')),\n 'subscriptions': clean(rows(\n 'SELECT us.* FROM user_subscriptions us WHERE us.user_id = :uid'\n )),\n 'audit_logs': clean(rows(\n 'SELECT action, metadata, created_at FROM audit_logs WHERE user_id = :uid ORDER BY created_at DESC'\n )),\n }\n\n\n# ---------------------------------------------------------------------------\n# POST /users//export\n# ---------------------------------------------------------------------------\n@privacy_bp.route('//export', methods=['POST'])\n@jwt_required\n@require_self_or_admin\ndef export_user_data(user_id: int):\n \"\"\"\n Generate a ZIP archive containing the user's full data export as JSON.\n Writes an audit log entry and optionally e-mails the user a download link.\n\n Request body (JSON, all optional):\n { \"format\": \"json\" } # only JSON supported today\n\n Response:\n application/zip — the data package as an attachment\n \"\"\"\n write_audit_log(user_id, 'DATA_EXPORT_REQUESTED', {'ip': request.remote_addr})\n\n try:\n data = _fetch_user_data(user_id)\n except Exception as exc:\n logger.exception('export failed for user %s', user_id)\n return jsonify({'error': 'Export failed', 'detail': str(exc)}), 500\n\n # Build in-memory ZIP\n buf = io.BytesIO()\n with zipfile.ZipFile(buf, 'w', zipfile.ZIP_DEFLATED) as zf:\n zf.writestr(\n f'finmind_export_{user_id}_{datetime.utcnow().strftime(\"%Y%m%d%H%M%S\")}.json',\n json.dumps(data, indent=2, default=str),\n )\n # Optionally add a human-readable README\n zf.writestr('README.txt',\n 'FinMind Personal Data Export\\n'\n '============================\\n'\n f'Generated : {data[\"exported_at\"]}\\n'\n f'User ID : {user_id}\\n\\n'\n 'This archive contains all personal data held by FinMind for your account.\\n'\n 'To request deletion, use the DELETE /users/{id}/delete endpoint or contact support.\\n'\n )\n buf.seek(0)\n\n write_audit_log(user_id, 'DATA_EXPORT_COMPLETED', {\n 'tables': list(data.keys()),\n 'ip': request.remote_addr,\n })\n\n # Best-effort notification email\n try:\n user_row = db.session.execute(\n text('SELECT email FROM users WHERE id = :uid'), {'uid': user_id}\n ).fetchone()\n if user_row:\n send_email(\n to=user_row[0],\n subject='Your FinMind data export is ready',\n body=(\n 'Hi,\\n\\nYour personal data export has been generated and is '\n 'attached to this response.\\n\\n'\n 'If you did not request this, please contact support immediately.\\n\\n'\n '— The FinMind Team'\n ),\n )\n except Exception as mail_exc:\n logger.warning('export email failed: %s', mail_exc)\n\n filename = f'finmind_data_export_{user_id}.zip'\n return send_file(\n buf,\n mimetype='application/zip',\n as_attachment=True,\n download_name=filename,\n )\n\n\n# ---------------------------------------------------------------------------\n# POST /users//delete (two-phase: request → confirm)\n# ---------------------------------------------------------------------------\n@privacy_bp.route('//delete', methods=['POST'])\n@jwt_required\n@require_self_or_admin\ndef request_deletion(user_id: int):\n \"\"\"\n Phase 1 — create a time-limited deletion token and e-mail it to the user.\n The client must confirm by calling POST /users//delete/confirm with the token.\n\n Request body (JSON):\n { \"reason\": \"optional free-text\" }\n \"\"\"\n import secrets\n body = request.get_json(silent=True) or {}\n reason = str(body.get('reason', ''))[:500]\n\n token = secrets.token_urlsafe(32)\n expires_at = datetime.now(timezone.utc).replace(microsecond=0)\n # Store token in DB (reuse audit_logs metadata or a dedicated table)\n db.session.execute(\n text(\"\"\"\n INSERT INTO deletion_requests (user_id, token, reason, expires_at, confirmed, created_at)\n VALUES (:uid, :token, :reason, :expires_at, FALSE, :now)\n ON CONFLICT (user_id) DO UPDATE\n SET token = EXCLUDED.token,\n reason = EXCLUDED.reason,\n expires_at = EXCLUDED.expires_at,\n confirmed = FALSE,\n created_at = EXCLUDED.created_at\n \"\"\"),\n {\n 'uid': user_id,\n 'token': token,\n 'reason': reason,\n 'expires_at': expires_at.replace(hour=expires_at.hour).isoformat(), # +24 h handled below\n 'now': datetime.now(timezone.utc),\n }\n )\n # Proper expiry: now + 24 h\n db.session.execute(\n text(\"\"\"\n UPDATE deletion_requests\n SET expires_at = NOW() + INTERVAL '24 hours'\n WHERE user_id = :uid\n \"\"\"),\n {'uid': user_id}\n )\n db.session.commit()\n\n write_audit_log(user_id, 'DELETION_REQUESTED', {\n 'reason': reason,\n 'ip': request.remote_addr,\n })\n\n # Send confirmation e-mail\n confirm_url = f\"{current_app.config.get('FRONTEND_URL', '')}/account/delete-confirm?token={token}\"\n try:\n user_row = db.session.execute(\n text('SELECT email FROM users WHERE id = :uid'), {'uid': user_id}\n ).fetchone()\n if user_row:\n send_email(\n to=user_row[0],\n subject='[FinMind] Confirm account deletion',\n body=(\n f'Hi,\\n\\nWe received a request to permanently delete your FinMind account.\\n\\n'\n f'If this was you, confirm by visiting:\\n{confirm_url}\\n\\n'\n 'This link expires in 24 hours.\\n\\n'\n 'If you did NOT request this, ignore this email — your account is safe.\\n\\n'\n '— The FinMind Team'\n ),\n )\n except Exception as mail_exc:\n logger.warning('deletion request email failed: %s', mail_exc)\n\n return jsonify({\n 'message': 'Deletion request received. Check your email to confirm.',\n 'expires_in_hours': 24,\n }), 202\n\n\n@privacy_bp.route('//delete/confirm', methods=['POST'])\n@jwt_required\n@require_self_or_admin\ndef confirm_deletion(user_id: int):\n \"\"\"\n \ No newline at end of file From be19e3ab781e8b3630d7ea5a6b8bc9921cec9561 Mon Sep 17 00:00:00 2001 From: moon <152454724+pabloDarkmoon24@users.noreply.github.com> Date: Mon, 23 Mar 2026 16:27:25 -0500 Subject: [PATCH 3/4] fix: solution for issue #76 --- fix_issue_76.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fix_issue_76.py b/fix_issue_76.py index 1d25d7fe..50a15ef9 100644 --- a/fix_issue_76.py +++ b/fix_issue_76.py @@ -1,3 +1,3 @@ ```json { - "solution_code": "### FILE: app/blueprints/privacy.py\n\n```python\nimport io\nimport json\nimport zipfile\nimport logging\nfrom datetime import datetime, timezone\nfrom functools import wraps\n\nfrom flask import Blueprint, jsonify, request, g, current_app, send_file\nfrom sqlalchemy import text\n\nfrom app.extensions import db\nfrom app.utils.auth import jwt_required, get_current_user_id\nfrom app.utils.cache import invalidate_user_cache\nfrom app.utils.email import send_email\n\nlogger = logging.getLogger(__name__)\n\nprivacy_bp = Blueprint('privacy', __name__, url_prefix='/users')\n\n\ndef require_self_or_admin(f):\n \"\"\"Decorator: user can only access their own data unless admin.\"\"\"\n @wraps(f)\n def decorated(user_id, *args, **kwargs):\n current_id = get_current_user_id()\n is_admin = getattr(g, 'is_admin', False)\n if str(current_id) != str(user_id) and not is_admin:\n return jsonify({'error': 'Forbidden'}), 403\n return f(user_id, *args, **kwargs)\n return decorated\n\n\ndef write_audit_log(user_id: int, action: str, metadata: dict = None, performed_by: int = None):\n \"\"\"Write an immutable audit log entry.\"\"\"\n try:\n db.session.execute(\n text(\"\"\"\n INSERT INTO audit_logs (user_id, action, metadata, performed_by, created_at)\n VALUES (:user_id, :action, :metadata, :performed_by, :created_at)\n \"\"\"),\n {\n 'user_id': user_id,\n 'action': action,\n 'metadata': json.dumps(metadata or {}),\n 'performed_by': performed_by or user_id,\n 'created_at': datetime.now(timezone.utc),\n }\n )\n db.session.commit()\n except Exception as exc:\n logger.error('audit_log write failed: %s', exc)\n db.session.rollback()\n\n\ndef _fetch_user_data(user_id: int) -> dict:\n \"\"\"\n Collect all PII-bearing rows for a user across every table.\n Returns a plain dict ready for JSON serialisation.\n \"\"\"\n def rows(sql, **params):\n result = db.session.execute(text(sql), {'uid': user_id, **params})\n cols = result.keys()\n return [dict(zip(cols, row)) for row in result.fetchall()]\n\n # Serialise datetime / date objects so json.dumps works later\n def serial(obj):\n if isinstance(obj, (datetime,)):\n return obj.isoformat()\n return str(obj)\n\n def clean(records):\n return [\n {k: serial(v) if not isinstance(v, (str, int, float, bool, type(None))) else v\n for k, v in r.items()}\n for r in records\n ]\n\n return {\n 'exported_at': datetime.now(timezone.utc).isoformat(),\n 'user': clean(rows('SELECT id, email, username, created_at, updated_at FROM users WHERE id = :uid')),\n 'categories': clean(rows('SELECT * FROM categories WHERE user_id = :uid')),\n 'expenses': clean(rows('SELECT * FROM expenses WHERE user_id = :uid ORDER BY date DESC')),\n 'bills': clean(rows('SELECT * FROM bills WHERE user_id = :uid ORDER BY due_date DESC')),\n 'reminders': clean(rows('SELECT * FROM reminders WHERE user_id = :uid')),\n 'subscriptions': clean(rows(\n 'SELECT us.* FROM user_subscriptions us WHERE us.user_id = :uid'\n )),\n 'audit_logs': clean(rows(\n 'SELECT action, metadata, created_at FROM audit_logs WHERE user_id = :uid ORDER BY created_at DESC'\n )),\n }\n\n\n# ---------------------------------------------------------------------------\n# POST /users//export\n# ---------------------------------------------------------------------------\n@privacy_bp.route('//export', methods=['POST'])\n@jwt_required\n@require_self_or_admin\ndef export_user_data(user_id: int):\n \"\"\"\n Generate a ZIP archive containing the user's full data export as JSON.\n Writes an audit log entry and optionally e-mails the user a download link.\n\n Request body (JSON, all optional):\n { \"format\": \"json\" } # only JSON supported today\n\n Response:\n application/zip — the data package as an attachment\n \"\"\"\n write_audit_log(user_id, 'DATA_EXPORT_REQUESTED', {'ip': request.remote_addr})\n\n try:\n data = _fetch_user_data(user_id)\n except Exception as exc:\n logger.exception('export failed for user %s', user_id)\n return jsonify({'error': 'Export failed', 'detail': str(exc)}), 500\n\n # Build in-memory ZIP\n buf = io.BytesIO()\n with zipfile.ZipFile(buf, 'w', zipfile.ZIP_DEFLATED) as zf:\n zf.writestr(\n f'finmind_export_{user_id}_{datetime.utcnow().strftime(\"%Y%m%d%H%M%S\")}.json',\n json.dumps(data, indent=2, default=str),\n )\n # Optionally add a human-readable README\n zf.writestr('README.txt',\n 'FinMind Personal Data Export\\n'\n '============================\\n'\n f'Generated : {data[\"exported_at\"]}\\n'\n f'User ID : {user_id}\\n\\n'\n 'This archive contains all personal data held by FinMind for your account.\\n'\n 'To request deletion, use the DELETE /users/{id}/delete endpoint or contact support.\\n'\n )\n buf.seek(0)\n\n write_audit_log(user_id, 'DATA_EXPORT_COMPLETED', {\n 'tables': list(data.keys()),\n 'ip': request.remote_addr,\n })\n\n # Best-effort notification email\n try:\n user_row = db.session.execute(\n text('SELECT email FROM users WHERE id = :uid'), {'uid': user_id}\n ).fetchone()\n if user_row:\n send_email(\n to=user_row[0],\n subject='Your FinMind data export is ready',\n body=(\n 'Hi,\\n\\nYour personal data export has been generated and is '\n 'attached to this response.\\n\\n'\n 'If you did not request this, please contact support immediately.\\n\\n'\n '— The FinMind Team'\n ),\n )\n except Exception as mail_exc:\n logger.warning('export email failed: %s', mail_exc)\n\n filename = f'finmind_data_export_{user_id}.zip'\n return send_file(\n buf,\n mimetype='application/zip',\n as_attachment=True,\n download_name=filename,\n )\n\n\n# ---------------------------------------------------------------------------\n# POST /users//delete (two-phase: request → confirm)\n# ---------------------------------------------------------------------------\n@privacy_bp.route('//delete', methods=['POST'])\n@jwt_required\n@require_self_or_admin\ndef request_deletion(user_id: int):\n \"\"\"\n Phase 1 — create a time-limited deletion token and e-mail it to the user.\n The client must confirm by calling POST /users//delete/confirm with the token.\n\n Request body (JSON):\n { \"reason\": \"optional free-text\" }\n \"\"\"\n import secrets\n body = request.get_json(silent=True) or {}\n reason = str(body.get('reason', ''))[:500]\n\n token = secrets.token_urlsafe(32)\n expires_at = datetime.now(timezone.utc).replace(microsecond=0)\n # Store token in DB (reuse audit_logs metadata or a dedicated table)\n db.session.execute(\n text(\"\"\"\n INSERT INTO deletion_requests (user_id, token, reason, expires_at, confirmed, created_at)\n VALUES (:uid, :token, :reason, :expires_at, FALSE, :now)\n ON CONFLICT (user_id) DO UPDATE\n SET token = EXCLUDED.token,\n reason = EXCLUDED.reason,\n expires_at = EXCLUDED.expires_at,\n confirmed = FALSE,\n created_at = EXCLUDED.created_at\n \"\"\"),\n {\n 'uid': user_id,\n 'token': token,\n 'reason': reason,\n 'expires_at': expires_at.replace(hour=expires_at.hour).isoformat(), # +24 h handled below\n 'now': datetime.now(timezone.utc),\n }\n )\n # Proper expiry: now + 24 h\n db.session.execute(\n text(\"\"\"\n UPDATE deletion_requests\n SET expires_at = NOW() + INTERVAL '24 hours'\n WHERE user_id = :uid\n \"\"\"),\n {'uid': user_id}\n )\n db.session.commit()\n\n write_audit_log(user_id, 'DELETION_REQUESTED', {\n 'reason': reason,\n 'ip': request.remote_addr,\n })\n\n # Send confirmation e-mail\n confirm_url = f\"{current_app.config.get('FRONTEND_URL', '')}/account/delete-confirm?token={token}\"\n try:\n user_row = db.session.execute(\n text('SELECT email FROM users WHERE id = :uid'), {'uid': user_id}\n ).fetchone()\n if user_row:\n send_email(\n to=user_row[0],\n subject='[FinMind] Confirm account deletion',\n body=(\n f'Hi,\\n\\nWe received a request to permanently delete your FinMind account.\\n\\n'\n f'If this was you, confirm by visiting:\\n{confirm_url}\\n\\n'\n 'This link expires in 24 hours.\\n\\n'\n 'If you did NOT request this, ignore this email — your account is safe.\\n\\n'\n '— The FinMind Team'\n ),\n )\n except Exception as mail_exc:\n logger.warning('deletion request email failed: %s', mail_exc)\n\n return jsonify({\n 'message': 'Deletion request received. Check your email to confirm.',\n 'expires_in_hours': 24,\n }), 202\n\n\n@privacy_bp.route('//delete/confirm', methods=['POST'])\n@jwt_required\n@require_self_or_admin\ndef confirm_deletion(user_id: int):\n \"\"\"\n \ No newline at end of file + "solution_code": "### File: app/routes/privacy.py\n\n```python\nimport io\nimport json\nimport zipfile\nimport csv\nfrom datetime import datetime, timezone\nfrom flask import Blueprint, request, jsonify, send_file\nfrom sqlalchemy import text\nfrom app.extensions import db\nfrom app.models import User, Expense, Bill, Reminder, AuditLog\nfrom app.auth import jwt_required, get_current_user_id\nfrom app.cache import invalidate_user_cache\nimport logging\n\nlogger = logging.getLogger(__name__)\n\nprivacy_bp = Blueprint('privacy', __name__, url_prefix='/user')\n\n\ndef _serialize_row(obj):\n \"\"\"Convert SQLAlchemy model instance to dict, handling dates.\"\"\"\n result = {}\n for col in obj.__table__.columns:\n val = getattr(obj, col.name)\n if isinstance(val, datetime):\n val = val.isoformat()\n result[col.name] = val\n return result\n\n\ndef _write_csv(records: list[dict]) -> str:\n \"\"\"Serialize list of dicts to CSV string.\"\"\"\n if not records:\n return \"\"\n output = io.StringIO()\n writer = csv.DictWriter(output, fieldnames=records[0].keys())\n writer.writeheader()\n writer.writerows(records)\n return output.getvalue()\n\n\n@privacy_bp.route('/export', methods=['GET'])\n@jwt_required\ndef export_user_data():\n \"\"\"\n GET /user/export\n Returns a ZIP archive containing all PII data for the authenticated user.\n Files included:\n - profile.json\n - expenses.json / expenses.csv\n - bills.json / bills.csv\n - reminders.json\n - audit_logs.json\n \"\"\"\n user_id = get_current_user_id()\n\n user = User.query.get(user_id)\n if not user or user.deleted_at is not None:\n return jsonify({'error': 'User not found'}), 404\n\n fmt = request.args.get('format', 'json').lower() # json | csv\n\n # --- Collect data ---\n expenses = Expense.query.filter_by(user_id=user_id).all()\n bills = Bill.query.filter_by(user_id=user_id).all()\n reminders = Reminder.query.filter_by(user_id=user_id).all()\n audit_logs = AuditLog.query.filter_by(user_id=user_id).order_by(AuditLog.created_at.asc()).all()\n\n profile_data = _serialize_row(user)\n # Remove sensitive hash from export\n profile_data.pop('password_hash', None)\n profile_data.pop('password', None)\n\n expenses_data = [_serialize_row(e) for e in expenses]\n bills_data = [_serialize_row(b) for b in bills]\n reminders_data = [_serialize_row(r) for r in reminders]\n audit_data = [_serialize_row(a) for a in audit_logs]\n\n # --- Build ZIP in memory ---\n zip_buffer = io.BytesIO()\n with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:\n zf.writestr('profile.json', json.dumps(profile_data, indent=2))\n\n if fmt == 'csv':\n zf.writestr('expenses.csv', _write_csv(expenses_data))\n zf.writestr('bills.csv', _write_csv(bills_data))\n zf.writestr('reminders.csv', _write_csv(reminders_data))\n zf.writestr('audit_logs.csv', _write_csv(audit_data))\n else:\n zf.writestr('expenses.json', json.dumps(expenses_data, indent=2))\n zf.writestr('bills.json', json.dumps(bills_data, indent=2))\n zf.writestr('reminders.json', json.dumps(reminders_data, indent=2))\n zf.writestr('audit_logs.json', json.dumps(audit_data, indent=2))\n\n # Manifest\n manifest = {\n 'exported_at': datetime.now(timezone.utc).isoformat(),\n 'user_id': user_id,\n 'record_counts': {\n 'expenses': len(expenses_data),\n 'bills': len(bills_data),\n 'reminders': len(reminders_data),\n 'audit_logs': len(audit_data),\n }\n }\n zf.writestr('manifest.json', json.dumps(manifest, indent=2))\n\n zip_buffer.seek(0)\n\n # --- Audit log entry ---\n try:\n log_entry = AuditLog(\n user_id=user_id,\n action='DATA_EXPORT',\n detail=json.dumps({\n 'format': fmt,\n 'record_counts': manifest['record_counts'],\n 'ip': request.remote_addr,\n }),\n created_at=datetime.now(timezone.utc),\n )\n db.session.add(log_entry)\n db.session.commit()\n except Exception as audit_err:\n logger.error('Failed to write export audit log: %s', audit_err)\n db.session.rollback()\n\n timestamp = datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%S')\n filename = f'finmind_export_{user_id}_{timestamp}.zip'\n\n return send_file(\n zip_buffer,\n mimetype='application/zip',\n as_attachment=True,\n download_name=filename,\n )\n\n\n@privacy_bp.route('/delete', methods=['DELETE'])\n@jwt_required\ndef delete_user_data():\n \"\"\"\n DELETE /user/delete\n Permanently and irreversibly deletes ALL data for the authenticated user.\n\n Body (JSON):\n {\n \"confirm\": \"DELETE MY ACCOUNT\", # required exact string\n \"reason\": \"optional free-text\"\n }\n\n Workflow:\n 1. Validate confirmation phrase.\n 2. Open a DB transaction.\n 3. Hard-delete child records (expenses, bills, reminders, refresh_tokens,\n ad_impressions, user_subscriptions).\n 4. Hard-delete the user row.\n 5. Write a tombstone audit log (kept for compliance, no PII).\n 6. Invalidate Redis caches.\n 7. Commit — any failure triggers full rollback.\n \"\"\"\n user_id = get_current_user_id()\n\n user = User.query.get(user_id)\n if not user or user.deleted_at is not None:\n return jsonify({'error': 'User not found'}), 404\n\n body = request.get_json(silent=True) or {}\n confirm_phrase = body.get('confirm', '')\n reason = body.get('reason', '')\n\n if confirm_phrase != 'DELETE MY ACCOUNT':\n return jsonify({\n 'error': 'Confirmation phrase mismatch.',\n 'hint': 'Send JSON body: {\"confirm\": \"DELETE MY ACCOUNT\"}'\n }), 422\n\n # Snapshot for audit BEFORE deletion\n snapshot = {\n 'user_id': user_id,\n 'email_hash': _hash_email(user.email),\n 'reason': reason,\n 'deleted_at': datetime.now(timezone.utc).isoformat(),\n 'ip': request.remote_addr,\n }\n\n try:\n with db.session.begin():\n # Delete child tables in FK-safe order\n _bulk_delete(user_id)\n\n # Hard delete the user\n db.session.delete(user)\n\n # Write tombstone audit log (no PII — just the hash + event)\n # This runs in its own transaction so deletion is not rolled back\n # if audit write fails (we log the error instead).\n _write_tombstone(snapshot)\n\n # Purge Redis caches\n try:\n invalidate_user_cache(user_id)\n except Exception as cache_err:\n logger.warning('Cache invalidation failed after delete: %s', cache_err)\n\n logger.info('User %s permanently deleted. Reason: %s', user_id, reason or 'none')\n return jsonify({\n 'message': 'Account and all associated data have been permanently deleted.',\n 'deleted_at': snapshot['deleted_at'],\n }), 200\n\n except Exception as exc:\n db.session.rollback()\n logger.error('User deletion failed for user_id=%s: %s', user_id, exc, exc_info=True)\n return jsonify({'error': 'Deletion failed. No data was removed. Please try again.'}), 500\n\n\n# ---------------------------------------------------------------------------\n# Helpers\n# ---------------------------------------------------------------------------\n\ndef _bulk_delete(user_id: int):\n \"\"\"Hard-delete all user-owned rows across every table.\"\"\"\n tables_and_columns = [\n ('reminders', 'user_id'),\n ('bills', 'user_id'),\n ('expenses', 'user_id'),\n ('ad_impressions', 'user_id'),\n ('user_subscriptions', 'user_id'),\n ('refresh_tokens', 'user_id'),\n # audit_logs intentionally kept for compliance\n ]\n for table, col in tables_and_columns:\n try:\n db.session.execute(\n text(f'DELETE FROM {table} WHERE {col} = :uid'),\n {'uid': user_id}\n )\n except Exception as e:\n logger.warning('Could not delete from %s (table may not exist): %s', table, e)\n # Non-fatal: table might not exist in all environments\n\n\ndef _write_tombstone(snapshot: dict):\n \"\"\"Write a PII-free tombstone record to audit_logs.\"\"\"\n try:\n stmt = text(\n \"\"\"\n INSERT INTO audit_logs (user_id, action, detail, created_at)\n VALUES (:uid, :action, :detail, :ts)\n \"\"\"\n )\n db.session.execute(stmt, {\n 'uid': snapshot['user_id'],\n 'action': 'ACCOUNT_DELETED',\n 'detail': json.dumps(snapshot),\n 'ts': datetime.now(timezone.utc),\n })\n db.session.commit()\n except Exception as e:\n logger.error('Failed to write deletion tombstone: %s', e)\n db.session.rollback()\n\n\ndef _hash_email(email: str) -> str:\n \"\"\"One-way hash of email for audit trail without storing PII.\"\"\"\n import hashlib\n return hashlib.sha256(email.lower().strip().encode()).hexdigest()\n```\n\n---\n\n### File: app/cache.py (add helper if not \ No newline at end of file From 355382d393cd9fbd24ee67a438d2fb5f9bc931d2 Mon Sep 17 00:00:00 2001 From: moon <152454724+pabloDarkmoon24@users.noreply.github.com> Date: Mon, 23 Mar 2026 16:27:32 -0500 Subject: [PATCH 4/4] fix: solution for issue #76 --- fix_issue_76.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fix_issue_76.py b/fix_issue_76.py index 50a15ef9..6af44d5e 100644 --- a/fix_issue_76.py +++ b/fix_issue_76.py @@ -1,3 +1,3 @@ ```json { - "solution_code": "### File: app/routes/privacy.py\n\n```python\nimport io\nimport json\nimport zipfile\nimport csv\nfrom datetime import datetime, timezone\nfrom flask import Blueprint, request, jsonify, send_file\nfrom sqlalchemy import text\nfrom app.extensions import db\nfrom app.models import User, Expense, Bill, Reminder, AuditLog\nfrom app.auth import jwt_required, get_current_user_id\nfrom app.cache import invalidate_user_cache\nimport logging\n\nlogger = logging.getLogger(__name__)\n\nprivacy_bp = Blueprint('privacy', __name__, url_prefix='/user')\n\n\ndef _serialize_row(obj):\n \"\"\"Convert SQLAlchemy model instance to dict, handling dates.\"\"\"\n result = {}\n for col in obj.__table__.columns:\n val = getattr(obj, col.name)\n if isinstance(val, datetime):\n val = val.isoformat()\n result[col.name] = val\n return result\n\n\ndef _write_csv(records: list[dict]) -> str:\n \"\"\"Serialize list of dicts to CSV string.\"\"\"\n if not records:\n return \"\"\n output = io.StringIO()\n writer = csv.DictWriter(output, fieldnames=records[0].keys())\n writer.writeheader()\n writer.writerows(records)\n return output.getvalue()\n\n\n@privacy_bp.route('/export', methods=['GET'])\n@jwt_required\ndef export_user_data():\n \"\"\"\n GET /user/export\n Returns a ZIP archive containing all PII data for the authenticated user.\n Files included:\n - profile.json\n - expenses.json / expenses.csv\n - bills.json / bills.csv\n - reminders.json\n - audit_logs.json\n \"\"\"\n user_id = get_current_user_id()\n\n user = User.query.get(user_id)\n if not user or user.deleted_at is not None:\n return jsonify({'error': 'User not found'}), 404\n\n fmt = request.args.get('format', 'json').lower() # json | csv\n\n # --- Collect data ---\n expenses = Expense.query.filter_by(user_id=user_id).all()\n bills = Bill.query.filter_by(user_id=user_id).all()\n reminders = Reminder.query.filter_by(user_id=user_id).all()\n audit_logs = AuditLog.query.filter_by(user_id=user_id).order_by(AuditLog.created_at.asc()).all()\n\n profile_data = _serialize_row(user)\n # Remove sensitive hash from export\n profile_data.pop('password_hash', None)\n profile_data.pop('password', None)\n\n expenses_data = [_serialize_row(e) for e in expenses]\n bills_data = [_serialize_row(b) for b in bills]\n reminders_data = [_serialize_row(r) for r in reminders]\n audit_data = [_serialize_row(a) for a in audit_logs]\n\n # --- Build ZIP in memory ---\n zip_buffer = io.BytesIO()\n with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:\n zf.writestr('profile.json', json.dumps(profile_data, indent=2))\n\n if fmt == 'csv':\n zf.writestr('expenses.csv', _write_csv(expenses_data))\n zf.writestr('bills.csv', _write_csv(bills_data))\n zf.writestr('reminders.csv', _write_csv(reminders_data))\n zf.writestr('audit_logs.csv', _write_csv(audit_data))\n else:\n zf.writestr('expenses.json', json.dumps(expenses_data, indent=2))\n zf.writestr('bills.json', json.dumps(bills_data, indent=2))\n zf.writestr('reminders.json', json.dumps(reminders_data, indent=2))\n zf.writestr('audit_logs.json', json.dumps(audit_data, indent=2))\n\n # Manifest\n manifest = {\n 'exported_at': datetime.now(timezone.utc).isoformat(),\n 'user_id': user_id,\n 'record_counts': {\n 'expenses': len(expenses_data),\n 'bills': len(bills_data),\n 'reminders': len(reminders_data),\n 'audit_logs': len(audit_data),\n }\n }\n zf.writestr('manifest.json', json.dumps(manifest, indent=2))\n\n zip_buffer.seek(0)\n\n # --- Audit log entry ---\n try:\n log_entry = AuditLog(\n user_id=user_id,\n action='DATA_EXPORT',\n detail=json.dumps({\n 'format': fmt,\n 'record_counts': manifest['record_counts'],\n 'ip': request.remote_addr,\n }),\n created_at=datetime.now(timezone.utc),\n )\n db.session.add(log_entry)\n db.session.commit()\n except Exception as audit_err:\n logger.error('Failed to write export audit log: %s', audit_err)\n db.session.rollback()\n\n timestamp = datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%S')\n filename = f'finmind_export_{user_id}_{timestamp}.zip'\n\n return send_file(\n zip_buffer,\n mimetype='application/zip',\n as_attachment=True,\n download_name=filename,\n )\n\n\n@privacy_bp.route('/delete', methods=['DELETE'])\n@jwt_required\ndef delete_user_data():\n \"\"\"\n DELETE /user/delete\n Permanently and irreversibly deletes ALL data for the authenticated user.\n\n Body (JSON):\n {\n \"confirm\": \"DELETE MY ACCOUNT\", # required exact string\n \"reason\": \"optional free-text\"\n }\n\n Workflow:\n 1. Validate confirmation phrase.\n 2. Open a DB transaction.\n 3. Hard-delete child records (expenses, bills, reminders, refresh_tokens,\n ad_impressions, user_subscriptions).\n 4. Hard-delete the user row.\n 5. Write a tombstone audit log (kept for compliance, no PII).\n 6. Invalidate Redis caches.\n 7. Commit — any failure triggers full rollback.\n \"\"\"\n user_id = get_current_user_id()\n\n user = User.query.get(user_id)\n if not user or user.deleted_at is not None:\n return jsonify({'error': 'User not found'}), 404\n\n body = request.get_json(silent=True) or {}\n confirm_phrase = body.get('confirm', '')\n reason = body.get('reason', '')\n\n if confirm_phrase != 'DELETE MY ACCOUNT':\n return jsonify({\n 'error': 'Confirmation phrase mismatch.',\n 'hint': 'Send JSON body: {\"confirm\": \"DELETE MY ACCOUNT\"}'\n }), 422\n\n # Snapshot for audit BEFORE deletion\n snapshot = {\n 'user_id': user_id,\n 'email_hash': _hash_email(user.email),\n 'reason': reason,\n 'deleted_at': datetime.now(timezone.utc).isoformat(),\n 'ip': request.remote_addr,\n }\n\n try:\n with db.session.begin():\n # Delete child tables in FK-safe order\n _bulk_delete(user_id)\n\n # Hard delete the user\n db.session.delete(user)\n\n # Write tombstone audit log (no PII — just the hash + event)\n # This runs in its own transaction so deletion is not rolled back\n # if audit write fails (we log the error instead).\n _write_tombstone(snapshot)\n\n # Purge Redis caches\n try:\n invalidate_user_cache(user_id)\n except Exception as cache_err:\n logger.warning('Cache invalidation failed after delete: %s', cache_err)\n\n logger.info('User %s permanently deleted. Reason: %s', user_id, reason or 'none')\n return jsonify({\n 'message': 'Account and all associated data have been permanently deleted.',\n 'deleted_at': snapshot['deleted_at'],\n }), 200\n\n except Exception as exc:\n db.session.rollback()\n logger.error('User deletion failed for user_id=%s: %s', user_id, exc, exc_info=True)\n return jsonify({'error': 'Deletion failed. No data was removed. Please try again.'}), 500\n\n\n# ---------------------------------------------------------------------------\n# Helpers\n# ---------------------------------------------------------------------------\n\ndef _bulk_delete(user_id: int):\n \"\"\"Hard-delete all user-owned rows across every table.\"\"\"\n tables_and_columns = [\n ('reminders', 'user_id'),\n ('bills', 'user_id'),\n ('expenses', 'user_id'),\n ('ad_impressions', 'user_id'),\n ('user_subscriptions', 'user_id'),\n ('refresh_tokens', 'user_id'),\n # audit_logs intentionally kept for compliance\n ]\n for table, col in tables_and_columns:\n try:\n db.session.execute(\n text(f'DELETE FROM {table} WHERE {col} = :uid'),\n {'uid': user_id}\n )\n except Exception as e:\n logger.warning('Could not delete from %s (table may not exist): %s', table, e)\n # Non-fatal: table might not exist in all environments\n\n\ndef _write_tombstone(snapshot: dict):\n \"\"\"Write a PII-free tombstone record to audit_logs.\"\"\"\n try:\n stmt = text(\n \"\"\"\n INSERT INTO audit_logs (user_id, action, detail, created_at)\n VALUES (:uid, :action, :detail, :ts)\n \"\"\"\n )\n db.session.execute(stmt, {\n 'uid': snapshot['user_id'],\n 'action': 'ACCOUNT_DELETED',\n 'detail': json.dumps(snapshot),\n 'ts': datetime.now(timezone.utc),\n })\n db.session.commit()\n except Exception as e:\n logger.error('Failed to write deletion tombstone: %s', e)\n db.session.rollback()\n\n\ndef _hash_email(email: str) -> str:\n \"\"\"One-way hash of email for audit trail without storing PII.\"\"\"\n import hashlib\n return hashlib.sha256(email.lower().strip().encode()).hexdigest()\n```\n\n---\n\n### File: app/cache.py (add helper if not \ No newline at end of file + "solution_code": "### FILE: app/routes/privacy.py\n\n```python\nimport csv\nimport io\nimport json\nimport logging\nimport os\nimport zipfile\nfrom datetime import datetime, timezone\n\nfrom flask import Blueprint, Response, g, jsonify, request, stream_with_context\nfrom flask_jwt_extended import get_jwt_identity, jwt_required\n\nfrom app.db import get_db\nfrom app.tasks import export_scheduler\nfrom app.utils.audit import log_audit_event\nfrom app.utils.redis_client import delete_user_cache\n\nlogger = logging.getLogger(__name__)\n\nprivacy_bp = Blueprint('privacy', __name__, url_prefix='/users')\n\n\n# ---------------------------------------------------------------------------\n# Helper: fetch all user data from DB\n# ---------------------------------------------------------------------------\n\ndef _fetch_user_data(conn, user_id: int) -> dict:\n cur = conn.cursor()\n\n def query(sql, params):\n cur.execute(sql, params)\n cols = [d[0] for d in cur.description]\n return [dict(zip(cols, row)) for row in cur.fetchall()]\n\n profile = query(\n \"\"\"SELECT id, email, full_name, created_at, updated_at\n FROM users WHERE id = %s\"\"\",\n (user_id,)\n )\n\n expenses = query(\n \"\"\"SELECT e.id, c.name AS category, e.amount, e.description,\n e.date, e.created_at\n FROM expenses e\n LEFT JOIN categories c ON c.id = e.category_id\n WHERE e.user_id = %s\n ORDER BY e.date DESC\"\"\",\n (user_id,)\n )\n\n bills = query(\n \"\"\"SELECT id, name, amount, due_date, is_paid, paid_at, created_at\n FROM bills WHERE user_id = %s ORDER BY due_date\"\"\",\n (user_id,)\n )\n\n reminders = query(\n \"\"\"SELECT id, bill_id, remind_at, channel, sent, created_at\n FROM reminders WHERE user_id = %s ORDER BY remind_at\"\"\",\n (user_id,)\n )\n\n categories = query(\n \"\"\"SELECT id, name, budget_limit, created_at\n FROM categories WHERE user_id = %s ORDER BY name\"\"\",\n (user_id,)\n )\n\n subscriptions = query(\n \"\"\"SELECT us.id, sp.name AS plan, us.started_at, us.expires_at, us.status\n FROM user_subscriptions us\n JOIN subscription_plans sp ON sp.id = us.plan_id\n WHERE us.user_id = %s\"\"\",\n (user_id,)\n )\n\n cur.close()\n return {\n 'profile': profile,\n 'categories': categories,\n 'expenses': expenses,\n 'bills': bills,\n 'reminders': reminders,\n 'subscriptions': subscriptions,\n 'exported_at': datetime.now(timezone.utc).isoformat(),\n }\n\n\ndef _serialize_dates(obj):\n \"\"\"JSON serialiser that handles date/datetime objects.\"\"\"\n if isinstance(obj, (datetime,)):\n return obj.isoformat()\n from datetime import date\n if isinstance(obj, date):\n return obj.isoformat()\n raise TypeError(f'Type {type(obj)} not serialisable')\n\n\ndef _build_zip(data: dict) -> bytes:\n \"\"\"Pack user data into an in-memory ZIP with JSON + CSV files.\"\"\"\n buf = io.BytesIO()\n with zipfile.ZipFile(buf, 'w', zipfile.ZIP_DEFLATED) as zf:\n # master JSON\n zf.writestr(\n 'data.json',\n json.dumps(data, indent=2, default=_serialize_dates)\n )\n # per-section CSVs\n for section, rows in data.items():\n if not isinstance(rows, list) or not rows:\n continue\n csv_buf = io.StringIO()\n writer = csv.DictWriter(csv_buf, fieldnames=rows[0].keys())\n writer.writeheader()\n for row in rows:\n writer.writerow(\n {k: v.isoformat() if isinstance(v, (datetime,)) else v\n for k, v in row.items()}\n )\n zf.writestr(f'{section}.csv', csv_buf.getvalue())\n\n # human-readable README\n zf.writestr('README.txt', (\n 'FinMind — Personal Data Export\\n'\n '================================\\n'\n f'Exported at: {data[\"exported_at\"]}\\n\\n'\n 'Files:\\n'\n ' data.json — complete data set (JSON)\\n'\n ' profile.csv — account information\\n'\n ' expenses.csv — expense records\\n'\n ' bills.csv — bill records\\n'\n ' reminders.csv — reminder records\\n'\n ' categories.csv — categories\\n'\n ' subscriptions.csv — subscription history\\n\\n'\n 'To request deletion contact: privacy@finmind.app\\n'\n ))\n buf.seek(0)\n return buf.read()\n\n\n# ---------------------------------------------------------------------------\n# GET /users/export\n# ---------------------------------------------------------------------------\n\n@privacy_bp.route('/export', methods=['GET'])\n@jwt_required()\ndef export_data():\n \"\"\"\n Generate and return a ZIP archive containing all personal data\n associated with the authenticated user.\n\n Headers added for GDPR compliance.\n \"\"\"\n user_id = int(get_jwt_identity())\n conn = get_db()\n\n try:\n data = _fetch_user_data(conn, user_id)\n zip_bytes = _build_zip(data)\n except Exception as exc:\n logger.exception('Export failed for user %s', user_id)\n return jsonify({'error': 'Export generation failed', 'detail': str(exc)}), 500\n\n log_audit_event(\n conn,\n user_id=user_id,\n action='DATA_EXPORT',\n detail={\n 'records': {\n k: len(v) for k, v in data.items() if isinstance(v, list)\n },\n 'ip': request.remote_addr,\n }\n )\n conn.commit()\n\n filename = f'finmind-export-{user_id}-{datetime.now(timezone.utc).strftime(\"%Y%m%d%H%M%S\")}.zip'\n response = Response(\n zip_bytes,\n status=200,\n mimetype='application/zip',\n headers={\n 'Content-Disposition': f'attachment; filename=\"{filename}\"',\n 'Content-Length': str(len(zip_bytes)),\n # GDPR / privacy headers\n 'Cache-Control': 'no-store, no-cache, must-revalidate, private',\n 'Pragma': 'no-cache',\n 'X-Content-Type-Options': 'nosniff',\n 'X-Robots-Tag': 'none',\n }\n )\n return response\n\n\n# ---------------------------------------------------------------------------\n# DELETE /users/delete\n# ---------------------------------------------------------------------------\n\n@privacy_bp.route('/delete', methods=['DELETE'])\n@jwt_required()\ndef delete_account():\n \"\"\"\n Permanently and irreversibly delete the authenticated user's account\n and all associated data.\n\n Body (JSON):\n confirm (str, required) — must equal the string \"DELETE\"\n reason (str, optional) — reason provided by the user\n\n Workflow:\n 1. Validate confirmation token\n 2. Write audit log entry BEFORE deletion\n 3. Hard-delete all user data (cascade via FK constraints)\n 4. Purge Redis cache keys\n 5. Respond 200 — caller must discard JWT client-side\n \"\"\"\n user_id = int(get_jwt_identity())\n body = request.get_json(silent=True) or {}\n\n if body.get('confirm') != 'DELETE':\n return jsonify({\n 'error': 'Confirmation required',\n 'detail': 'Send JSON body {\"confirm\": \"DELETE\"} to proceed.'\n }), 400\n\n reason = body.get('reason', 'Not provided')\n conn = get_db()\n cur = conn.cursor()\n\n try:\n # ----------------------------------------------------------------\n # 1. Capture a snapshot for the audit record BEFORE deleting\n # ----------------------------------------------------------------\n cur.execute(\n 'SELECT email, full_name, created_at FROM users WHERE id = %s',\n (user_id,)\n )\n row = cur.fetchone()\n if not row:\n cur.close()\n return jsonify({'error': 'User not found'}), 404\n\n email, full_name, created_at = row\n\n # ----------------------------------------------------------------\n # 2. Write the audit log entry (survives user deletion because\n # audit_logs.user_id has ON DELETE SET NULL)\n # ----------------------------------------------------------------\n log_audit_event(\n conn,\n user_id=user_id,\n action='ACCOUNT_DELETE_INITIATED',\n detail={\n 'email': email,\n 'full_name': full_name,\n 'account_created_at': created_at.isoformat() if hasattr(created_at, 'isoformat') else str(created_at),\n 'deletion_reason': reason,\n 'ip': request.remote_addr,\n 'user_agent': request.user_agent.string,\n }\n )\n\n # ----------------------------------------------------------------\n # 3. Hard delete — rely on FK ON DELETE CASCADE for child tables\n # (expenses, bills, reminders, categories, tokens, etc.)\n # ----------------------------------------------------------------\n _hard_delete_user(cur, user_id)\n\n # ----------------------------------------------------------------\n # 4. Final audit record (user_id will be NULL after delete)\n # ----------------------------------------------------------------\n log_audit_event(\n conn,\n user_id=None, # user is gone\n action='ACCOUNT_DELETE_COMPLETED',\n detail={\n 'deleted_user_id': user_id,\n 'email_hash': _hash_email(email),\n 'ip': request.remote_addr,\n }\n )\n\n conn.commit()\n cur.close()\n\n except Exception as exc:\n conn.rollback()\n cur.close()\n logger.exception('Deletion failed for user %s', user_id)\n return jsonify({'error': 'Deletion failed', 'detail': str(exc)}), 500\n\n # ----------------------------------------------------------------\n # 5. Purge Redis cache (best-effort, non-fatal)\n # ----------------------------------------------------------------\n try:\n delete_user_cache(user_id)\n except Exception:\n logger.warning('Redis cache purge failed for deleted \ No newline at end of file