diff --git a/.github/workflows/daily-scraper.yml b/.github/workflows/daily-scraper.yml index dfc2484..3448f49 100644 --- a/.github/workflows/daily-scraper.yml +++ b/.github/workflows/daily-scraper.yml @@ -40,15 +40,31 @@ jobs: type = drive scope = drive service_account_file = /tmp/service-account-key.json - root_folder_id = https://drive.google.com/drive/folders/1nYUczTuBjUoaSa9cucpjQU8zkEojdHBp?usp=sharing + # Don't use root_folder_id - we'll specify full paths instead EOF echo '${{ secrets.GOOGLE_DRIVE_SERVICE_ACCOUNT_JSON }}' > /tmp/service-account-key.json + - name: Download only metadata files run: | mkdir -p data/moe/metadata - # Only download JSON/CSV metadata files, not PDFs - rclone copy gdrive:metadata data/moe/metadata -v --drive-shared-with-me --include "*.json" --include "*.csv" + + # List accessible folders first + echo "Listing accessible Drive folders..." + rclone lsd gdrive: --drive-shared-with-me + + # Try accessing by name path instead of ID + echo "Attempting to access moe_data/metadata..." + rclone copy "gdrive:moe_data/metadata" data/moe/metadata -v --drive-shared-with-me --include "*.json" --include "*.csv" || echo "No metadata files yet" + + - name: Upload updated metadata + run: | + if [ -d "data/moe/metadata" ] && [ "$(ls -A data/moe/metadata)" ]; then + echo "Uploading metadata files..." + rclone copy data/moe/metadata "gdrive:moe_data/metadata" -v --drive-shared-with-me + else + echo "No metadata directory or files to upload" + fi - name: Install Python dependencies @@ -59,13 +75,19 @@ jobs: - name: Run MoE scraper (scrape + upload directly to Drive) env: - RCLONE_REMOTE:gdrive: + RCLONE_REMOTE: "gdrive:moe_data/" + GOOGLE_DRIVE_MASTER_FOLDER_ID: ${{ secrets.GOOGLE_DRIVE_MASTER_FOLDER_ID }} run: | python backend/services/moe_scraper_service.py --all - name: Upload updated metadata run: | - rclone copy data/moe/metadata gdrive:metadata -v --drive-shared-with-me + if [ -d "data/moe/metadata" ] && [ "$(ls -A data/moe/metadata)" ]; then + echo "Uploading metadata files..." + rclone copy data/moe/metadata "gdrive:moe_data/metadata" -v --drive-shared-with-me + else + echo "No metadata directory or files to upload" + fi - name: Upload scrape report if: always() diff --git a/backend/api/main.py b/backend/api/main.py index dfb72f7..bd064ad 100644 --- a/backend/api/main.py +++ b/backend/api/main.py @@ -1,7 +1,7 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from backend.core.config import get_settings -from backend.api.routers import collections, documents, search, chat, upload, sync, scraper +from backend.api.routers import collections, documents, search, chat, upload, sync, scraper, files_server # Add files import from backend.api.routers.parse_marker import router as parse_marker_router settings = get_settings() @@ -13,13 +13,10 @@ ) # CORS: allow your frontend origins in development -# CORS: allow your frontend origins in development +# Add CORS middleware for team access app.add_middleware( CORSMiddleware, - allow_origins=[ - "http://localhost:3000", - "http://127.0.0.1:3000", - ], + allow_origins=["*"], # In production, specify team member IPs/domains allow_credentials=True, allow_methods=["*"], allow_headers=["*"], @@ -43,6 +40,7 @@ app.include_router(parse_marker_router, prefix="/api/marker", tags=["Marker"]) app.include_router(sync.router, prefix="/api", tags=["Sync"]) app.include_router(scraper.router, prefix="/api", tags=["Scraper"]) +app.include_router(files_server.router) # Add this line @app.get("/") async def root(): diff --git a/backend/api/routers/files_server.py b/backend/api/routers/files_server.py new file mode 100644 index 0000000..a715c0d --- /dev/null +++ b/backend/api/routers/files_server.py @@ -0,0 +1,479 @@ +""" +File serving endpoints for team access +Serves files from local storage via HTTP +""" + +from fastapi import APIRouter, HTTPException, Response +from fastapi.responses import FileResponse, StreamingResponse +from pathlib import Path +from typing import Optional, List +import mimetypes +import zipfile +import io +from datetime import datetime + +from backend.services.local_storage_service import ( + get_file_path, + get_file_info, + list_files_in_category, + get_all_categories +) +from backend.services.mongodb_service import ( + get_document_by_hash, + find_documents +) + +router = APIRouter(prefix="/api/files", tags=["files"]) + + +@router.get("/categories") +async def get_categories(): + """Get all available categories""" + categories = get_all_categories() + return { + "categories": categories, + "count": len(categories) + } + + +@router.get("/category/{category}") +async def list_category_files(category: str): + """List all files in a category""" + try: + files = list_files_in_category(category) + + file_list = [] + for file_path in files: + if file_path.is_file(): + info = get_file_info(file_path) + file_list.append({ + "filename": file_path.name, + "size": info["size"], + "hash": info["hash"], + "modified_time": info["modified_time"], + "category": category, + "download_url": f"/api/files/download/{category}/{file_path.name}" + }) + + return { + "category": category, + "files": file_list, + "count": len(file_list) + } + + except Exception as e: + raise HTTPException(status_code=404, detail=f"Category not found: {str(e)}") + + +@router.get("/download/{category}/{filename}") +async def download_file(category: str, filename: str): + """ + Download file from local storage + Team members can access this endpoint to get files + """ + try: + file_path = get_file_path(category, filename) + + if not file_path.exists(): + raise HTTPException(status_code=404, detail="File not found") + + # Determine MIME type + mime_type, _ = mimetypes.guess_type(str(file_path)) + if mime_type is None: + mime_type = "application/octet-stream" + + return FileResponse( + path=str(file_path), + media_type=mime_type, + filename=filename, + headers={ + "Content-Disposition": f'attachment; filename="{filename}"' + } + ) + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error downloading file: {str(e)}") + + +@router.get("/view/{category}/{filename}") +async def view_file(category: str, filename: str): + """ + View/preview file in browser (for PDFs, images, etc.) + Opens in browser instead of downloading + """ + try: + file_path = get_file_path(category, filename) + + if not file_path.exists(): + raise HTTPException(status_code=404, detail="File not found") + + mime_type, _ = mimetypes.guess_type(str(file_path)) + if mime_type is None: + mime_type = "application/octet-stream" + + return FileResponse( + path=str(file_path), + media_type=mime_type, + filename=filename, + headers={ + "Content-Disposition": f'inline; filename="{filename}"' + } + ) + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error viewing file: {str(e)}") + + +@router.get("/stream/{category}/{filename}") +async def stream_file(category: str, filename: str): + """ + Stream large files in chunks + Better for large PDFs or videos + """ + try: + file_path = get_file_path(category, filename) + + if not file_path.exists(): + raise HTTPException(status_code=404, detail="File not found") + + def iterfile(): + with open(file_path, mode="rb") as file: + while chunk := file.read(1024 * 1024): # 1MB chunks + yield chunk + + mime_type, _ = mimetypes.guess_type(str(file_path)) + if mime_type is None: + mime_type = "application/octet-stream" + + return StreamingResponse( + iterfile(), + media_type=mime_type, + headers={ + "Content-Disposition": f'inline; filename="{filename}"' + } + ) + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error streaming file: {str(e)}") + + +@router.get("/info/{category}/{filename}") +async def get_file_metadata(category: str, filename: str): + """ + Get file metadata without downloading + """ + try: + file_path = get_file_path(category, filename) + + if not file_path.exists(): + raise HTTPException(status_code=404, detail="File not found") + + info = get_file_info(file_path) + + # Get from MongoDB if exists + mongo_doc = get_document_by_hash(info["hash"]) + + return { + "filename": filename, + "category": category, + "size": info["size"], + "hash": info["hash"], + "modified_time": info["modified_time"], + "download_url": f"/api/files/download/{category}/{filename}", + "view_url": f"/api/files/view/{category}/{filename}", + "mongodb_metadata": mongo_doc if mongo_doc else None + } + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error getting file info: {str(e)}") + + +@router.get("/search") +async def search_files( + query: Optional[str] = None, + category: Optional[str] = None, + min_size: Optional[int] = None, + max_size: Optional[int] = None +): + """ + Search files across all categories or specific category + """ + try: + categories = [category] if category else get_all_categories() + results = [] + + for cat in categories: + files = list_files_in_category(cat) + + for file_path in files: + if not file_path.is_file(): + continue + + # Apply filters + if query and query.lower() not in file_path.name.lower(): + continue + + info = get_file_info(file_path) + + if min_size and info["size"] < min_size: + continue + if max_size and info["size"] > max_size: + continue + + results.append({ + "filename": file_path.name, + "category": cat, + "size": info["size"], + "hash": info["hash"], + "modified_time": info["modified_time"], + "download_url": f"/api/files/download/{cat}/{file_path.name}", + "view_url": f"/api/files/view/{cat}/{file_path.name}" + }) + + return { + "results": results, + "count": len(results) + } + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Error searching files: {str(e)}") + + +@router.get("/bulk-download") +async def bulk_download_all_pdfs( + categories: Optional[str] = None, # Comma-separated categories + format: str = "zip" # Future: support tar, etc. +): + """ + Bulk download all PDFs as a ZIP file + + Args: + categories: Optional comma-separated list of categories (e.g., "research,reports") + If not provided, downloads from all categories + format: Archive format (currently only 'zip' supported) + + Returns: + ZIP file containing all PDFs organized by category + + Example: + /api/files/bulk-download + /api/files/bulk-download?categories=research,reports + """ + try: + # Determine which categories to include + if categories: + category_list = [c.strip() for c in categories.split(",")] + else: + category_list = get_all_categories() + + # Create in-memory ZIP file + zip_buffer = io.BytesIO() + + total_files = 0 + total_size = 0 + + with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: + for category in category_list: + try: + files = list_files_in_category(category, pattern="*.pdf") + + for file_path in files: + if not file_path.is_file(): + continue + + # Add file to ZIP with category folder structure + # data/local_storage/category/filename.pdf + arcname = f"data/local_storage/{category}/{file_path.name}" + + zip_file.write(file_path, arcname=arcname) + + total_files += 1 + total_size += file_path.stat().st_size + + except Exception as e: + print(f"Warning: Error processing category {category}: {e}") + continue + + # Prepare ZIP for download + zip_buffer.seek(0) + + # Generate filename with timestamp + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"pdfs_bulk_download_{timestamp}.zip" + + # Add metadata as comment in ZIP + metadata = f"Bulk download - {total_files} files, {total_size / 1024 / 1024:.2f} MB" + + return StreamingResponse( + iter([zip_buffer.getvalue()]), + media_type="application/zip", + headers={ + "Content-Disposition": f'attachment; filename="{filename}"', + "X-Total-Files": str(total_files), + "X-Total-Size": str(total_size), + "X-Categories": ",".join(category_list) + } + ) + + except Exception as e: + raise HTTPException( + status_code=500, + detail=f"Error creating bulk download: {str(e)}" + ) + + +@router.get("/bulk-download-by-category/{category}") +async def bulk_download_category(category: str): + """ + Download all PDFs from a specific category as ZIP + + Example: + /api/files/bulk-download-by-category/research + """ + try: + files = list_files_in_category(category, pattern="*.pdf") + + if not files: + raise HTTPException( + status_code=404, + detail=f"No PDF files found in category: {category}" + ) + + # Create ZIP + zip_buffer = io.BytesIO() + + with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: + for file_path in files: + if not file_path.is_file(): + continue + + # Structure: data/local_storage/category/filename.pdf + arcname = f"data/local_storage/{category}/{file_path.name}" + zip_file.write(file_path, arcname=arcname) + + zip_buffer.seek(0) + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"{category}_pdfs_{timestamp}.zip" + + return StreamingResponse( + iter([zip_buffer.getvalue()]), + media_type="application/zip", + headers={ + "Content-Disposition": f'attachment; filename="{filename}"' + } + ) + + except Exception as e: + raise HTTPException( + status_code=500, + detail=f"Error downloading category: {str(e)}" + ) + + +@router.post("/bulk-download-selected") +async def bulk_download_selected_files(file_list: List[dict]): + """ + Download specific files as ZIP + + Request body: + [ + {"category": "research", "filename": "paper1.pdf"}, + {"category": "reports", "filename": "report2.pdf"} + ] + """ + try: + if not file_list: + raise HTTPException(status_code=400, detail="File list is empty") + + zip_buffer = io.BytesIO() + + with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file: + for item in file_list: + category = item.get("category") + filename = item.get("filename") + + if not category or not filename: + continue + + try: + file_path = get_file_path(category, filename) + + if file_path.exists() and file_path.is_file(): + arcname = f"data/local_storage/{category}/{filename}" + zip_file.write(file_path, arcname=arcname) + + except Exception as e: + print(f"Warning: Could not add {category}/{filename}: {e}") + continue + + zip_buffer.seek(0) + + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + filename = f"selected_pdfs_{timestamp}.zip" + + return StreamingResponse( + iter([zip_buffer.getvalue()]), + media_type="application/zip", + headers={ + "Content-Disposition": f'attachment; filename="{filename}"' + } + ) + + except Exception as e: + raise HTTPException( + status_code=500, + detail=f"Error downloading selected files: {str(e)}" + ) + + +@router.get("/bulk-download-info") +async def get_bulk_download_info(categories: Optional[str] = None): + """ + Get information about bulk download without downloading + Shows how many files and total size + """ + try: + if categories: + category_list = [c.strip() for c in categories.split(",")] + else: + category_list = get_all_categories() + + info = { + "categories": [], + "total_files": 0, + "total_size_bytes": 0, + "total_size_mb": 0 + } + + for category in category_list: + files = list_files_in_category(category, pattern="*.pdf") + + category_size = 0 + file_count = 0 + + for file_path in files: + if file_path.is_file(): + file_count += 1 + category_size += file_path.stat().st_size + + info["categories"].append({ + "name": category, + "file_count": file_count, + "size_bytes": category_size, + "size_mb": round(category_size / 1024 / 1024, 2) + }) + + info["total_files"] += file_count + info["total_size_bytes"] += category_size + + info["total_size_mb"] = round(info["total_size_bytes"] / 1024 / 1024, 2) + info["estimated_zip_size_mb"] = round(info["total_size_mb"] * 0.95, 2) # PDFs don't compress much + + return info + + except Exception as e: + raise HTTPException( + status_code=500, + detail=f"Error getting download info: {str(e)}" + ) \ No newline at end of file diff --git a/backend/services/moe_scraper_service.py b/backend/services/moe_scraper_service.py index 760fdbe..c700e44 100644 --- a/backend/services/moe_scraper_service.py +++ b/backend/services/moe_scraper_service.py @@ -1,24 +1,9 @@ # backend/services/moe_scraper_service.py """ -Ministry of Education Web Scraper Service +Ministry of Education Web Scraper Service (Rclone-only version) -This service incrementally scrapes MoE websites and uploads new content to Google Drive. -It checks for updates in existing folders and only scrapes/uploads new content. - -Designed to work with existing Google Drive folder structure: -- moe_scraped_higher_edu_RUSA -- Scraped_moe_archived_advertisment -- scraped_moe_archived_circulars -- Scraped_moe_archived_press_releases -- Scraped_moe_archived_scholarships -- scraped_moe_archived_updates -- scraped_moe_documents&reports -- scraped_moe_higher_education_schemes -- scraped_moe_mothly_achivements -- scraped_moe_rti -- scraped_moe_schemes -- scraped_moe_statistics +This service scrapes MoE websites and uploads new content to Google Drive using rclone. """ import requests @@ -26,17 +11,12 @@ from urllib.parse import urljoin, urlparse from pathlib import Path import time -import hashlib -from typing import List, Dict, Optional, Set, Tuple +from typing import List, Dict, Optional, Set from datetime import datetime import json import os from dotenv import load_dotenv - -from google.oauth2 import service_account -from googleapiclient.discovery import build -from googleapiclient.http import MediaFileUpload, MediaIoBaseUpload -import io +import subprocess # Load environment load_dotenv() @@ -44,7 +24,7 @@ class MoEScraperService: """ - Incremental MoE scraper that checks for updates and uploads only new content + MoE scraper that uses rclone for Google Drive operations """ # Mapping of Drive folders to their source URLs and scraping config @@ -99,30 +79,19 @@ class MoEScraperService: } } - def __init__(self, drive_service, master_folder_id: str): - """ - Initialize scraper service - - Args: - drive_service: Authenticated Google Drive API service - master_folder_id: Google Drive master folder ID containing all scrape folders - """ - self.drive_service = drive_service - self.master_folder_id = master_folder_id + def __init__(self): + """Initialize scraper service with rclone""" + # Rclone setup + self.rclone_remote = os.getenv('RCLONE_REMOTE', 'gdrive:') + self.master_folder_id = os.getenv('GOOGLE_DRIVE_MASTER_FOLDER_ID') - # Detect and cache the Shared Drive ID - self.shared_drive_id = self._get_shared_drive_id() - - # Cache of existing files in Drive (folder_name -> set of filenames) + # Cache of existing files (folder_name -> set of filenames) self.existing_files_cache: Dict[str, Set[str]] = {} - # Folder ID cache (folder_name -> drive_folder_id) - self.folder_id_cache: Dict[str, str] = {} - # HTTP session self.session = requests.Session() self.session.headers.update({ - 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36' + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36' }) # Stats @@ -135,111 +104,95 @@ def __init__(self, drive_service, master_folder_id: str): "errors": [] } - def _get_shared_drive_id(self) -> Optional[str]: - """ - Get the Shared Drive ID for the master folder - - Returns None if folder is in My Drive (won't work with service accounts) - """ + def _rclone_path(self, *parts) -> str: + """Build rclone path from folder ID and subpaths""" + if self.master_folder_id: + # Use direct folder ID access + base = f"{self.rclone_remote}{self.master_folder_id}" + if parts: + return f"{base}/{'/'.join(parts)}" + return base + else: + # Fallback to named path + return f"{self.rclone_remote}{'/'.join(parts)}" + + def get_existing_files_in_folder(self, folder_name: str) -> Set[str]: + """Get set of existing filenames in a Drive folder via rclone""" try: - file_meta = self.drive_service.files().get( - fileId=self.master_folder_id, - fields='id, name, driveId', - supportsAllDrives=True - ).execute() + path = self._rclone_path("pdfs", folder_name) + result = subprocess.run([ + "rclone", "lsf", + path, + "--drive-shared-with-me" + ], capture_output=True, text=True) + + if result.returncode == 0: + files = set(result.stdout.strip().split('\n')) + filenames = {f for f in files if f} # Remove empty strings + print(f" Found {len(filenames)} existing files in folder") + return filenames + return set() + except Exception as e: + print(f"Error listing files via rclone: {e}") + return set() + + def download_pdf(self, url: str, filename: str) -> Optional[bytes]: + """Download PDF file""" + try: + print(f" Downloading: {filename}") + response = self.session.get(url, timeout=60, stream=True) + response.raise_for_status() - drive_id = file_meta.get('driveId') - folder_name = file_meta.get('name', 'Unknown') + pdf_bytes = response.content - if drive_id: - print(f"✓ Detected Shared Drive: {folder_name}") - print(f" Drive ID: {drive_id}") - else: - print(f"⚠️ WARNING: Folder '{folder_name}' is in My Drive, not a Shared Drive!") - print(f" Service accounts cannot upload to My Drive.") - print(f" Please use a folder within a Shared Drive.") + # Verify it's actually a PDF + if not pdf_bytes.startswith(b'%PDF'): + print(f" Warning: File doesn't appear to be a PDF, skipping") + return None - return drive_id + print(f" Downloaded {len(pdf_bytes)} bytes") + return pdf_bytes except Exception as e: - print(f"Error detecting Shared Drive: {e}") + print(f" Error downloading {url}: {e}") + self.stats["errors"].append(f"Download error ({filename}): {str(e)}") return None - def get_folder_id(self, folder_name: str) -> Optional[str]: - """ - Get Google Drive folder ID by name - - Caches results to avoid repeated API calls - """ - if folder_name in self.folder_id_cache: - return self.folder_id_cache[folder_name] - + def upload_to_drive(self, pdf_bytes: bytes, filename: str, folder_name: str) -> Optional[str]: + """Upload PDF bytes to Google Drive via rclone""" try: - query = f"name='{folder_name}' and '{self.master_folder_id}' in parents and mimeType='application/vnd.google-apps.folder' and trashed=false" - - results = self.drive_service.files().list( - q=query, - fields='files(id, name)', - supportsAllDrives=True, - includeItemsFromAllDrives=True - ).execute() - - files = results.get('files', []) - - if files: - folder_id = files[0]['id'] - self.folder_id_cache[folder_name] = folder_id - return folder_id + # Save temporarily + temp_path = Path(f"data/moe/temp/{filename}") + temp_path.parent.mkdir(parents=True, exist_ok=True) + temp_path.write_bytes(pdf_bytes) + + # Upload via rclone + remote_path = self._rclone_path("pdfs", folder_name, filename) + result = subprocess.run([ + "rclone", "copyto", + str(temp_path), + remote_path, + "--drive-shared-with-me" + ], capture_output=True, text=True) + + # Clean up temp file + temp_path.unlink() + + if result.returncode == 0: + print(f" ✓ Uploaded via rclone: {filename}") + return filename else: - print(f"Warning: Folder '{folder_name}' not found in Drive") + print(f" ⚠️ Rclone upload failed: {result.stderr}") + self.stats["errors"].append(f"Rclone upload error ({filename}): {result.stderr}") return None except Exception as e: - print(f"Error getting folder ID for '{folder_name}': {e}") + print(f" Error uploading {filename} via rclone: {e}") + self.stats["errors"].append(f"Upload error ({filename}): {str(e)}") return None - def get_existing_files_in_folder(self, folder_id: str) -> Set[str]: - """ - Get set of existing filenames in a Drive folder - - Used to avoid re-uploading duplicates - """ - try: - query = f"'{folder_id}' in parents and trashed=false" - - files = [] - page_token = None - - while True: - results = self.drive_service.files().list( - q=query, - fields='nextPageToken, files(name)', - pageToken=page_token, - supportsAllDrives=True, - includeItemsFromAllDrives=True - ).execute() - - files.extend(results.get('files', [])) - page_token = results.get('nextPageToken') - - if not page_token: - break - - filenames = {f['name'] for f in files} - print(f" Found {len(filenames)} existing files in folder") - return filenames - - except Exception as e: - print(f"Error listing files in folder: {e}") - return set() - def find_pdf_links(self, url: str) -> List[Dict[str, str]]: - """ - Find all PDF download links on a webpage - - Returns: - List of dicts with 'url', 'title', 'filename' for each PDF - """ + """Find all PDF download links on a webpage""" try: print(f" Scraping: {url}") response = self.session.get(url, timeout=30) @@ -250,30 +203,24 @@ def find_pdf_links(self, url: str) -> List[Dict[str, str]]: pdf_links = [] seen_urls = set() - # Find all links for link in soup.find_all('a', href=True): href = link['href'] # Check if it's a PDF link if href.lower().endswith('.pdf') or '/pdf/' in href.lower() or 'download' in href.lower(): - # Make absolute URL absolute_url = urljoin(url, href) - # Skip duplicates if absolute_url in seen_urls: continue seen_urls.add(absolute_url) - # Get link text/title link_text = link.get_text(strip=True) title = link.get('title', link_text) - # Generate filename from URL or title parsed = urlparse(absolute_url) filename = Path(parsed.path).name if not filename or filename == '': - # Generate from title or hash filename = self._sanitize_filename(title or 'document') + '.pdf' if not filename.lower().endswith('.pdf'): @@ -295,119 +242,32 @@ def find_pdf_links(self, url: str) -> List[Dict[str, str]]: def _sanitize_filename(self, filename: str) -> str: """Clean filename for safe storage""" - # Remove invalid characters invalid_chars = '<>:"/\\|?*' for char in invalid_chars: filename = filename.replace(char, '_') - # Limit length if len(filename) > 200: filename = filename[:200] return filename.strip() - def download_pdf(self, url: str, filename: str) -> Optional[bytes]: - """ - Download PDF file - - Returns: - PDF bytes, or None if failed - """ - try: - print(f" Downloading: {filename}") - response = self.session.get(url, timeout=60, stream=True) - response.raise_for_status() - - # Read content - pdf_bytes = response.content - - # Verify it's actually a PDF (check magic bytes) - if not pdf_bytes.startswith(b'%PDF'): - print(f" Warning: File doesn't appear to be a PDF, skipping") - return None - - print(f" Downloaded {len(pdf_bytes)} bytes") - return pdf_bytes - - except Exception as e: - print(f" Error downloading {url}: {e}") - self.stats["errors"].append(f"Download error ({filename}): {str(e)}") - return None - - def upload_to_drive(self, pdf_bytes: bytes, filename: str, folder_id: str) -> Optional[str]: - """ - Upload PDF bytes to Google Drive folder - - Returns: - Google Drive file ID, or None if failed - """ - try: - # Prepare file metadata - file_metadata = { - 'name': filename, - 'parents': [folder_id] - } - - # Create media upload from bytes - media = MediaIoBaseUpload( - io.BytesIO(pdf_bytes), - mimetype='application/pdf', - resumable=True - ) - - # Upload - file = self.drive_service.files().create( - body=file_metadata, - media_body=media, - fields='id, name, size, modifiedTime', - supportsAllDrives=True - ).execute() - - file_id = file.get('id') - print(f" ✓ Uploaded to Drive: {filename} (ID: {file_id})") - - return file_id - - except Exception as e: - print(f" Error uploading {filename} to Drive: {e}") - self.stats["errors"].append(f"Upload error ({filename}): {str(e)}") - return None - def scrape_folder(self, folder_name: str, config: Dict) -> Dict: - """ - Scrape a single MoE page and upload new PDFs to Drive folder - - Args: - folder_name: Name of Google Drive folder - config: Scraping configuration (url, description) - - Returns: - Dict with scraping statistics - """ + """Scrape a single folder configuration""" print(f"\n{'='*60}") print(f"Scraping: {folder_name}") print(f"URL: {config['url']}") print(f"Description: {config['description']}") print(f"{'='*60}") - # Get folder ID - folder_id = self.get_folder_id(folder_name) - - if not folder_id: - return { - "folder_name": folder_name, - "status": "error", - "error": "Folder not found in Drive" - } + self.stats["folders_checked"] += 1 # Get existing files in folder - existing_files = self.get_existing_files_in_folder(folder_id) + existing_files = self.get_existing_files_in_folder(folder_name) # Find PDF links on page pdf_links = self.find_pdf_links(config['url']) self.stats["pdfs_found"] += len(pdf_links) - # Track results for this folder folder_stats = { "folder_name": folder_name, "pdfs_found": len(pdf_links), @@ -421,7 +281,6 @@ def scrape_folder(self, folder_name: str, config: Dict) -> Dict: for i, pdf_info in enumerate(pdf_links, 1): pdf_url = pdf_info['url'] filename = pdf_info['filename'] - title = pdf_info['title'] print(f"\n [{i}/{len(pdf_links)}] {filename}") @@ -443,18 +302,15 @@ def scrape_folder(self, folder_name: str, config: Dict) -> Dict: continue # Upload to Drive - file_id = self.upload_to_drive(pdf_bytes, filename, folder_id) + result = self.upload_to_drive(pdf_bytes, filename, folder_name) - if file_id: + if result: folder_stats["pdfs_uploaded"] += 1 self.stats["pdfs_uploaded"] += 1 - - # Add to existing files cache existing_files.add(filename) else: folder_stats["errors"].append(f"Failed to upload: {filename}") - # Be nice to the server time.sleep(2) # Summary for this folder @@ -468,12 +324,7 @@ def scrape_folder(self, folder_name: str, config: Dict) -> Dict: return folder_stats def scrape_all_folders(self) -> Dict: - """ - Scrape all configured MoE pages and upload to respective Drive folders - - Returns: - Combined statistics for all folders - """ + """Scrape all configured MoE pages""" start_time = time.time() print(f"\n{'='*60}") @@ -484,12 +335,8 @@ def scrape_all_folders(self) -> Dict: folder_results = [] for folder_name, config in self.SCRAPE_CONFIG.items(): - self.stats["folders_checked"] += 1 - result = self.scrape_folder(folder_name, config) folder_results.append(result) - - # Brief pause between folders time.sleep(3) duration = time.time() - start_time @@ -503,11 +350,10 @@ def scrape_all_folders(self) -> Dict: print(f"Total PDFs found: {self.stats['pdfs_found']}") print(f"New PDFs: {self.stats['pdfs_new']}") print(f"PDFs uploaded: {self.stats['pdfs_uploaded']}") - print(f"PDFs skipped (already exist): {self.stats['pdfs_skipped']}") + print(f"PDFs skipped: {self.stats['pdfs_skipped']}") print(f"Total errors: {len(self.stats['errors'])}") print(f"{'='*60}\n") - # Prepare final report report = { "timestamp": datetime.now().isoformat(), "duration_seconds": duration, @@ -518,15 +364,7 @@ def scrape_all_folders(self) -> Dict: return report def scrape_specific_folders(self, folder_names: List[str]) -> Dict: - """ - Scrape only specific folders - - Args: - folder_names: List of folder names to scrape - - Returns: - Statistics for scraped folders - """ + """Scrape only specific folders""" start_time = time.time() folder_results = [] @@ -536,11 +374,8 @@ def scrape_specific_folders(self, folder_names: List[str]) -> Dict: continue config = self.SCRAPE_CONFIG[folder_name] - self.stats["folders_checked"] += 1 - result = self.scrape_folder(folder_name, config) folder_results.append(result) - time.sleep(3) duration = time.time() - start_time @@ -556,29 +391,17 @@ def scrape_specific_folders(self, folder_names: List[str]) -> Dict: def create_scraper_service() -> MoEScraperService: - """ - Factory function to create authenticated MoE scraper service - - Reads credentials from environment variables - """ - service_account_file = os.getenv("GOOGLE_DRIVE_SERVICE_ACCOUNT_FILE") + """Factory function to create MoE scraper service""" + rclone_remote = os.getenv("RCLONE_REMOTE", "gdrive:") master_folder_id = os.getenv("GOOGLE_DRIVE_MASTER_FOLDER_ID") - if not service_account_file: - raise ValueError("GOOGLE_DRIVE_SERVICE_ACCOUNT_FILE environment variable not set") - if not master_folder_id: raise ValueError("GOOGLE_DRIVE_MASTER_FOLDER_ID environment variable not set") - # Authenticate with Google Drive - scopes = ['https://www.googleapis.com/auth/drive'] - creds = service_account.Credentials.from_service_account_file( - service_account_file, scopes=scopes - ) - - drive_service = build('drive', 'v3', credentials=creds) + print(f"Using rclone mode: {rclone_remote}") + print(f"Master folder ID: {master_folder_id}") - return MoEScraperService(drive_service, master_folder_id) + return MoEScraperService() # CLI interface diff --git a/backend/services/mongodb_service.py b/backend/services/mongodb_service.py index d4c3d85..dd5a35c 100644 --- a/backend/services/mongodb_service.py +++ b/backend/services/mongodb_service.py @@ -5,16 +5,32 @@ from pymongo.collection import Collection from bson import ObjectId from typing import Optional, Dict, List, Any -import os import copy from datetime import datetime, timezone, timedelta from pathlib import Path from dotenv import load_dotenv +import os +from typing import Optional +from dotenv import load_dotenv # Load environment variables -backend_dir = Path(__file__).parent.parent -env_file = backend_dir / ".env" -load_dotenv(env_file) +load_dotenv() + +# MongoDB connection settings +# Use localhost for connections on the same machine +MONGO_HOST = os.getenv("MONGO_HOST", "localhost") # Changed from 192.168.1.100 +MONGO_PORT = int(os.getenv("MONGO_PORT", "27017")) +MONGO_DB_NAME = os.getenv("MONGO_DB_NAME", "victor_rag") +MONGO_USER = os.getenv("MONGO_USER", "") +MONGO_PASSWORD = os.getenv("MONGO_PASSWORD", "") + +# Build connection URI +if MONGO_USER and MONGO_PASSWORD: + MONGO_URI = f"mongodb://{MONGO_USER}:{MONGO_PASSWORD}@{MONGO_HOST}:{MONGO_PORT}/{MONGO_DB_NAME}?authSource=admin" +else: + MONGO_URI = f"mongodb://{MONGO_HOST}:{MONGO_PORT}/{MONGO_DB_NAME}" + +print(f"MongoDB URI: mongodb://{MONGO_HOST}:{MONGO_PORT}/{MONGO_DB_NAME}") # Lazy initialization _mongo_client: Optional[MongoClient] = None @@ -83,12 +99,22 @@ def prepare_query(query: Dict[str, Any]) -> Dict[str, Any]: def get_mongo_client() -> MongoClient: - """Get MongoDB client singleton""" + """Get MongoDB client instance""" global _mongo_client if _mongo_client is None: - mongo_uri = os.getenv("MONGODB_URI", "mongodb://localhost:27017/") - _mongo_client = MongoClient(mongo_uri) - print(f"MongoDB client initialized: {mongo_uri}") + try: + _mongo_client = MongoClient( + MONGO_URI, + serverSelectionTimeoutMS=5000, # 5 seconds timeout + connectTimeoutMS=5000, + socketTimeoutMS=5000 + ) + # Test connection + _mongo_client.admin.command('ping') + print(f"✅ Connected to MongoDB at {MONGO_HOST}:{MONGO_PORT}") + except Exception as e: + print(f"❌ Failed to connect to MongoDB: {e}") + raise return _mongo_client diff --git a/backend/services/sync_service.py b/backend/services/sync_service.py index 9ba7443..3ca8d04 100644 --- a/backend/services/sync_service.py +++ b/backend/services/sync_service.py @@ -380,3 +380,28 @@ def create_sync_service(drive_folder_id: str = None) -> SyncService: # Use the default category mapping for existing folder structure return SyncService(drive_folder_id) + +from fastapi import APIRouter, BackgroundTasks, HTTPException + +router = APIRouter() + +@router.post("/sync-local-storage") +async def sync_local_storage(background_tasks: BackgroundTasks): + """ + Sync local storage files to MongoDB + + Scans all files in local storage and creates MongoDB records + """ + try: + from backend.scripts.sync_local_to_mongo import sync_local_storage_to_mongodb + + # Run in background + background_tasks.add_task(sync_local_storage_to_mongodb) + + return { + "status": "started", + "message": "Local storage sync started in background" + } + + except Exception as e: + raise HTTPException(status_code=500, detail=f"Failed to start sync: {str(e)}")