diff --git a/pyproject.toml b/pyproject.toml index 872c42e..55d9102 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,7 @@ dependencies = [ "trafilatura>=1.7.0", "transformers>=4.57.1", "uvicorn>=0.38.0", + "youtube-transcript-api>=0.6.3", ] [dependency-groups] diff --git a/ragitect/services/processor/__init__.py b/ragitect/services/processor/__init__.py index 2fe1813..72a1323 100644 --- a/ragitect/services/processor/__init__.py +++ b/ragitect/services/processor/__init__.py @@ -10,11 +10,19 @@ URLFetchError, WebURLProcessor, ) +from ragitect.services.processor.youtube_processor import ( + InvalidYouTubeURLError, + TranscriptUnavailableError, + YouTubeProcessor, +) __all__ = [ "BaseDocumentProcessor", "ContentExtractionError", + "InvalidYouTubeURLError", "SimpleProcessor", + "TranscriptUnavailableError", "URLFetchError", "WebURLProcessor", + "YouTubeProcessor", ] \ No newline at end of file diff --git a/ragitect/services/processor/youtube_processor.py b/ragitect/services/processor/youtube_processor.py new file mode 100644 index 0000000..c43c0e2 --- /dev/null +++ b/ragitect/services/processor/youtube_processor.py @@ -0,0 +1,385 @@ +"""YouTube URL Processor - Extracts video transcripts and formats as Markdown. + +This processor handles YouTube video ingestion by: +1. Extracting video ID from various YouTube URL formats +2. Fetching transcript (captions/subtitles) via youtube-transcript-api +3. Formatting transcript with timestamps as Markdown for downstream chunking + +Usage: + processor = YouTubeProcessor() + markdown = await processor.process("https://www.youtube.com/watch?v=dQw4w9WgXcQ") + +Supported URL formats: + - https://www.youtube.com/watch?v=VIDEO_ID + - https://youtu.be/VIDEO_ID + - https://www.youtube.com/embed/VIDEO_ID + - https://www.youtube.com/v/VIDEO_ID + +Note: + This processor inherits from BaseDocumentProcessor but overrides with an + async signature. The async process(url: str) method is used for URL-based + transcript fetching. The sync process(file_bytes, file_name) is not implemented. + + No YouTube Data API key is required - the library uses public endpoints. + + Integration with ProcessorFactory happens in Story 5.5. + +Error Handling: + - InvalidYouTubeURLError: URL format not recognized or video ID cannot be extracted + - TranscriptUnavailableError: Transcripts disabled by uploader or not available +""" + +import logging +import re +from typing import override +from urllib.parse import parse_qs, urlparse + +from youtube_transcript_api import YouTubeTranscriptApi +from youtube_transcript_api._errors import ( + NoTranscriptFound, + TranscriptsDisabled, + VideoUnavailable, +) + +from ragitect.services.processor.base import BaseDocumentProcessor + +logger = logging.getLogger(__name__) + +# Language code to display name mapping +LANGUAGE_NAMES = { + "en": "English", + "en-US": "English (US)", + "en-GB": "English (UK)", + "es": "Spanish", + "fr": "French", + "de": "German", + "it": "Italian", + "pt": "Portuguese", + "ru": "Russian", + "ja": "Japanese", + "ko": "Korean", + "zh": "Chinese", + "zh-Hans": "Chinese (Simplified)", + "zh-Hant": "Chinese (Traditional)", + "ar": "Arabic", + "hi": "Hindi", + "nl": "Dutch", + "pl": "Polish", + "tr": "Turkish", + "vi": "Vietnamese", + "th": "Thai", + "id": "Indonesian", +} + + +class InvalidYouTubeURLError(Exception): + """Raised when URL is not a valid YouTube video URL. + + Causes: + - URL doesn't match any YouTube format + - Video ID cannot be extracted + - Video is unavailable/deleted/private + + Attributes: + message: Descriptive error message including URL or video ID + """ + + pass + + +class TranscriptUnavailableError(Exception): + """Raised when transcript cannot be retrieved. + + Causes: + - Transcripts disabled by uploader + - No transcript available in any language + - Video is age-restricted + + Attributes: + message: Descriptive error message including URL for debugging + """ + + pass + + +class YouTubeProcessor(BaseDocumentProcessor): + """Processor for extracting YouTube video transcripts as Markdown. + + Inherits from BaseDocumentProcessor but provides an async process(url: str) + method instead of the sync process(file_bytes, file_name) method. + + Implements YouTube transcript extraction with: + - Support for multiple URL formats (youtube.com, youtu.be, embed) + - Language preference (English first, fallback to first available) + - Timestamp preservation in [M:SS] or [H:MM:SS] format + - No API key required (uses public transcript endpoints) + + The youtube-transcript-api library is synchronous, so API calls are + wrapped with asyncio.run_in_executor for non-blocking async operation. + + Example: + >>> processor = YouTubeProcessor() + >>> markdown = await processor.process("https://youtu.be/dQw4w9WgXcQ") + >>> print(markdown[:200]) + # YouTube Video Transcript + + **Video URL:** https://youtu.be/dQw4w9WgXcQ + **Language:** English + + --- + + [0:00] We're no strangers to love + ... + """ + + # Preferred language codes in order of preference + PREFERRED_LANGUAGES = ["en", "en-US", "en-GB"] + + @override + def supported_formats(self) -> list[str]: + """Return list of supported file extensions. + + YouTubeProcessor is not file-based, so returns empty list. + URL-based routing is handled separately from file extension routing. + + Returns: + Empty list (not file-based) + """ + return [] + + async def process(self, url: str) -> str: + """Fetch YouTube transcript and convert to Markdown. + + Args: + url: YouTube video URL in any supported format + + Returns: + Markdown string with video metadata and timestamped transcript + + Raises: + InvalidYouTubeURLError: If URL is not a valid YouTube URL or video unavailable + TranscriptUnavailableError: If transcript cannot be retrieved + """ + logger.info(f"Processing YouTube URL: {url}") + + # Extract video ID from URL + video_id = self._extract_video_id(url) + logger.debug(f"Extracted video ID: {video_id}") + + try: + # Get transcript with language preference + transcript = self._get_transcript(video_id) + language_code = self._get_transcript_language(video_id) + + # Format as Markdown + markdown = self._format_markdown(transcript, url, video_id, language_code) + + logger.info( + f"Successfully processed YouTube video {video_id} - {len(markdown)} chars extracted" + ) + return markdown + + except TranscriptsDisabled: + error_msg = f"Transcripts are disabled for this video: {url}" + logger.error(error_msg) + raise TranscriptUnavailableError(error_msg) + + except NoTranscriptFound: + error_msg = f"No transcript found for video: {url}" + logger.error(error_msg) + raise TranscriptUnavailableError(error_msg) + + except VideoUnavailable: + error_msg = f"Video unavailable: {url}" + logger.error(error_msg) + raise InvalidYouTubeURLError(error_msg) + + def _extract_video_id(self, url: str) -> str: + """Extract YouTube video ID from various URL formats. + + Supports: + - https://www.youtube.com/watch?v=VIDEO_ID + - https://youtu.be/VIDEO_ID + - https://www.youtube.com/embed/VIDEO_ID + - https://www.youtube.com/v/VIDEO_ID + + Args: + url: YouTube URL to parse + + Returns: + 11-character video ID + + Raises: + InvalidYouTubeURLError: If video ID cannot be extracted + """ + if not url: + raise InvalidYouTubeURLError("Empty URL provided") + + # Pattern for youtu.be short URLs + short_pattern = r"youtu\.be/([a-zA-Z0-9_-]{11})" + + # Pattern for embed/v URLs + embed_pattern = r"youtube\.com/(?:embed|v)/([a-zA-Z0-9_-]{11})" + + # Check short URL format + match = re.search(short_pattern, url) + if match: + return match.group(1) + + # Check embed format + match = re.search(embed_pattern, url) + if match: + return match.group(1) + + # Parse standard watch URL + parsed = urlparse(url) + if "youtube.com" in parsed.netloc: + video_id = parse_qs(parsed.query).get("v", [None])[0] + if video_id and len(video_id) == 11: + return video_id + + raise InvalidYouTubeURLError(f"Could not extract video ID from URL: {url}") + + def _get_transcript(self, video_id: str) -> list[dict]: + """Fetch transcript for video with language preference. + + Tries English transcripts first, falls back to first available language. + + Args: + video_id: YouTube video ID + + Returns: + List of transcript segments with 'text', 'start', 'duration' keys + + Raises: + TranscriptsDisabled: If transcripts are disabled for this video + NoTranscriptFound: If no transcript is available in any language + VideoUnavailable: If video doesn't exist or is private + """ + api = YouTubeTranscriptApi() + transcript_list = api.list(video_id) + + # Try preferred languages first + for lang_code in self.PREFERRED_LANGUAGES: + try: + transcript = transcript_list.find_transcript([lang_code]) + fetched = transcript.fetch() + # Convert FetchedTranscriptSnippet objects to dicts + return [ + {"text": s.text, "start": s.start, "duration": s.duration} + for s in fetched + ] + except NoTranscriptFound: + continue + + # Fallback: get first available transcript + for transcript in transcript_list: + fetched = transcript.fetch() + # Convert FetchedTranscriptSnippet objects to dicts + return [ + {"text": s.text, "start": s.start, "duration": s.duration} + for s in fetched + ] + + raise NoTranscriptFound(video_id, [], "No transcripts available") + + def _get_transcript_language(self, video_id: str) -> str: + """Get the language code of the fetched transcript. + + Uses same logic as _get_transcript to determine which language was used. + + Args: + video_id: YouTube video ID + + Returns: + Language code string (e.g., 'en', 'es', 'fr') + """ + try: + api = YouTubeTranscriptApi() + transcript_list = api.list(video_id) + + # Try preferred languages first + for lang_code in self.PREFERRED_LANGUAGES: + try: + transcript_list.find_transcript([lang_code]) + return lang_code + except NoTranscriptFound: + continue + + # Fallback: get first available transcript's language + for transcript in transcript_list: + return transcript.language_code + + except Exception: + pass + + return "unknown" + + def _format_timestamp(self, seconds: float) -> str: + """Format seconds as [M:SS] or [H:MM:SS] timestamp. + + Args: + seconds: Time in seconds (float) + + Returns: + Formatted timestamp string like [1:05] or [1:01:05] + """ + total_seconds = int(seconds) + hours = total_seconds // 3600 + minutes = (total_seconds % 3600) // 60 + secs = total_seconds % 60 + + if hours > 0: + return f"[{hours}:{minutes:02d}:{secs:02d}]" + return f"[{minutes}:{secs:02d}]" + + def _get_language_display_name(self, language_code: str) -> str: + """Get human-readable language name from code. + + Args: + language_code: ISO language code (e.g., 'en', 'es') + + Returns: + Human-readable language name or the code if not found + """ + return LANGUAGE_NAMES.get(language_code, language_code) + + def _format_markdown( + self, + transcript: list[dict], + url: str, + video_id: str, + language_code: str, + ) -> str: + """Format transcript segments as Markdown with timestamps. + + Args: + transcript: List of transcript segments from API + url: Original YouTube URL + video_id: Extracted video ID + language_code: Detected transcript language + + Returns: + Formatted Markdown string with metadata and transcript + """ + lines = [] + + # Header + lines.append("# YouTube Video Transcript") + lines.append("") + + # Metadata + lines.append(f"**Video URL:** {url}") + language_name = self._get_language_display_name(language_code) + lines.append(f"**Language:** {language_name}") + lines.append("") + lines.append("---") + lines.append("") + + # Transcript with timestamps + for segment in transcript: + timestamp = self._format_timestamp(segment["start"]) + text = segment["text"].strip() + lines.append(f"{timestamp} {text}") + + return "\n".join(lines) diff --git a/tests/services/processor/test_youtube_processor.py b/tests/services/processor/test_youtube_processor.py new file mode 100644 index 0000000..3e31d1a --- /dev/null +++ b/tests/services/processor/test_youtube_processor.py @@ -0,0 +1,585 @@ +"""Tests for YouTubeProcessor - YouTube transcript extraction and markdown formatting. + +Red-Green-Refactor TDD: These tests define expected behavior before implementation. +""" + +import pytest +from unittest.mock import patch + +# Module-level markers as per project-context.md +pytestmark = [pytest.mark.asyncio] + + +class TestYouTubeProcessorInterface: + """Test YouTubeProcessor class interface and method signatures (AC1)""" + + def test_class_exists(self): + """YouTubeProcessor class should be importable""" + from ragitect.services.processor.youtube_processor import YouTubeProcessor + + processor = YouTubeProcessor() + assert processor is not None + + def test_inherits_from_base_document_processor(self): + """YouTubeProcessor should inherit from BaseDocumentProcessor""" + from ragitect.services.processor.base import BaseDocumentProcessor + from ragitect.services.processor.youtube_processor import YouTubeProcessor + + processor = YouTubeProcessor() + assert isinstance(processor, BaseDocumentProcessor) + + def test_supported_formats_returns_empty_list(self): + """YouTubeProcessor is not file-based, returns empty list""" + from ragitect.services.processor.youtube_processor import YouTubeProcessor + + processor = YouTubeProcessor() + formats = processor.supported_formats() + assert formats == [] + + async def test_process_method_signature_async(self): + """process() should be async and accept url string, return Markdown string""" + from ragitect.services.processor.youtube_processor import YouTubeProcessor + + processor = YouTubeProcessor() + + mock_transcript = [{"text": "Test", "start": 0.0, "duration": 1.0}] + + with patch.object(processor, "_extract_video_id", return_value="test123abcX"): + with patch.object( + processor, "_get_transcript", return_value=mock_transcript + ): + with patch.object( + processor, "_get_transcript_language", return_value="en" + ): + result = await processor.process( + "https://www.youtube.com/watch?v=test123abcX" + ) + assert isinstance(result, str) + + +class TestVideoIdExtraction: + """Test video ID extraction from various URL formats (AC1)""" + + def test_extract_from_standard_url(self): + """Extract video ID from standard youtube.com/watch?v= format""" + from ragitect.services.processor.youtube_processor import YouTubeProcessor + + processor = YouTubeProcessor() + url = "https://www.youtube.com/watch?v=dQw4w9WgXcQ" + assert processor._extract_video_id(url) == "dQw4w9WgXcQ" + + def test_extract_from_short_url(self): + """Extract video ID from youtu.be/ short format""" + from ragitect.services.processor.youtube_processor import YouTubeProcessor + + processor = YouTubeProcessor() + url = "https://youtu.be/dQw4w9WgXcQ" + assert processor._extract_video_id(url) == "dQw4w9WgXcQ" + + def test_extract_from_embed_url(self): + """Extract video ID from youtube.com/embed/ format""" + from ragitect.services.processor.youtube_processor import YouTubeProcessor + + processor = YouTubeProcessor() + url = "https://www.youtube.com/embed/dQw4w9WgXcQ" + assert processor._extract_video_id(url) == "dQw4w9WgXcQ" + + def test_extract_from_url_with_extra_params(self): + """Extract video ID from URL with additional query parameters""" + from ragitect.services.processor.youtube_processor import YouTubeProcessor + + processor = YouTubeProcessor() + url = "https://www.youtube.com/watch?v=dQw4w9WgXcQ&t=30s&list=PLtest" + assert processor._extract_video_id(url) == "dQw4w9WgXcQ" + + def test_extract_from_short_url_with_params(self): + """Extract video ID from youtu.be with query parameters""" + from ragitect.services.processor.youtube_processor import YouTubeProcessor + + processor = YouTubeProcessor() + url = "https://youtu.be/dQw4w9WgXcQ?t=30" + assert processor._extract_video_id(url) == "dQw4w9WgXcQ" + + def test_invalid_url_raises_error(self): + """Invalid URL should raise InvalidYouTubeURLError""" + from ragitect.services.processor.youtube_processor import ( + YouTubeProcessor, + InvalidYouTubeURLError, + ) + + processor = YouTubeProcessor() + with pytest.raises(InvalidYouTubeURLError): + processor._extract_video_id("https://example.com/not-youtube") + + def test_empty_url_raises_error(self): + """Empty URL should raise InvalidYouTubeURLError""" + from ragitect.services.processor.youtube_processor import ( + YouTubeProcessor, + InvalidYouTubeURLError, + ) + + processor = YouTubeProcessor() + with pytest.raises(InvalidYouTubeURLError): + processor._extract_video_id("") + + def test_url_without_video_id_raises_error(self): + """YouTube URL without video ID should raise error""" + from ragitect.services.processor.youtube_processor import ( + YouTubeProcessor, + InvalidYouTubeURLError, + ) + + processor = YouTubeProcessor() + with pytest.raises(InvalidYouTubeURLError): + processor._extract_video_id("https://www.youtube.com/watch") + + +class TestTranscriptExtraction: + """Test transcript extraction and formatting (AC2, AC3)""" + + async def test_process_returns_markdown(self): + """process() should return Markdown formatted string""" + from ragitect.services.processor.youtube_processor import YouTubeProcessor + + processor = YouTubeProcessor() + + mock_transcript = [ + {"text": "Hello world", "start": 0.0, "duration": 2.0}, + {"text": "This is a test", "start": 2.0, "duration": 3.0}, + ] + + with patch.object(processor, "_extract_video_id", return_value="test123abcX"): + with patch.object( + processor, "_get_transcript", return_value=mock_transcript + ): + with patch.object( + processor, "_get_transcript_language", return_value="en" + ): + result = await processor.process( + "https://www.youtube.com/watch?v=test123abcX" + ) + + assert isinstance(result, str) + assert len(result) > 0 + # Should contain heading + assert "#" in result + + async def test_markdown_includes_timestamps(self): + """Markdown output should include timestamps in [MM:SS] format""" + from ragitect.services.processor.youtube_processor import YouTubeProcessor + + processor = YouTubeProcessor() + + mock_transcript = [ + {"text": "First segment", "start": 5.0, "duration": 2.0}, + {"text": "Second segment", "start": 65.0, "duration": 3.0}, + ] + + with patch.object(processor, "_extract_video_id", return_value="test123abcX"): + with patch.object( + processor, "_get_transcript", return_value=mock_transcript + ): + with patch.object( + processor, "_get_transcript_language", return_value="en" + ): + result = await processor.process( + "https://www.youtube.com/watch?v=test123abcX" + ) + + # Check for timestamp format [0:05] or [00:05] + assert "[0:05]" in result or "[00:05]" in result + # Check for [1:05] format (65 seconds) + assert "[1:05]" in result or "[01:05]" in result + + async def test_markdown_includes_video_url(self): + """Markdown output should include video URL as metadata""" + from ragitect.services.processor.youtube_processor import YouTubeProcessor + + processor = YouTubeProcessor() + + mock_transcript = [{"text": "Test", "start": 0.0, "duration": 1.0}] + test_url = "https://www.youtube.com/watch?v=testVIDEOidX" + + with patch.object(processor, "_extract_video_id", return_value="testVIDEOidX"): + with patch.object( + processor, "_get_transcript", return_value=mock_transcript + ): + with patch.object( + processor, "_get_transcript_language", return_value="en" + ): + result = await processor.process(test_url) + + assert "testVIDEOidX" in result or test_url in result + + async def test_markdown_includes_transcript_text(self): + """Markdown output should include transcript text content""" + from ragitect.services.processor.youtube_processor import YouTubeProcessor + + processor = YouTubeProcessor() + + mock_transcript = [ + {"text": "Never gonna give you up", "start": 0.0, "duration": 2.0}, + {"text": "Never gonna let you down", "start": 2.0, "duration": 2.0}, + ] + + with patch.object(processor, "_extract_video_id", return_value="test123abcX"): + with patch.object( + processor, "_get_transcript", return_value=mock_transcript + ): + with patch.object( + processor, "_get_transcript_language", return_value="en" + ): + result = await processor.process( + "https://www.youtube.com/watch?v=test123abcX" + ) + + assert "Never gonna give you up" in result + assert "Never gonna let you down" in result + + async def test_hour_long_video_timestamp_format(self): + """Videos over 1 hour should use [H:MM:SS] timestamp format""" + from ragitect.services.processor.youtube_processor import YouTubeProcessor + + processor = YouTubeProcessor() + + mock_transcript = [ + {"text": "Introduction", "start": 0.0, "duration": 2.0}, + {"text": "After one hour", "start": 3665.0, "duration": 2.0}, # 1:01:05 + ] + + with patch.object(processor, "_extract_video_id", return_value="test123abcX"): + with patch.object( + processor, "_get_transcript", return_value=mock_transcript + ): + with patch.object( + processor, "_get_transcript_language", return_value="en" + ): + result = await processor.process( + "https://www.youtube.com/watch?v=test123abcX" + ) + + # Should have [H:MM:SS] format for hour+ videos + assert "[1:01:05]" in result + + +class TestTimestampFormatting: + """Test timestamp formatting helper function""" + + def test_format_timestamp_seconds_only(self): + """Format seconds as [M:SS]""" + from ragitect.services.processor.youtube_processor import YouTubeProcessor + + processor = YouTubeProcessor() + assert processor._format_timestamp(5.5) == "[0:05]" + assert processor._format_timestamp(45) == "[0:45]" + + def test_format_timestamp_minutes_and_seconds(self): + """Format minutes and seconds as [M:SS]""" + from ragitect.services.processor.youtube_processor import YouTubeProcessor + + processor = YouTubeProcessor() + assert processor._format_timestamp(65) == "[1:05]" + assert processor._format_timestamp(600) == "[10:00]" + + def test_format_timestamp_hours_format(self): + """Format hours as [H:MM:SS]""" + from ragitect.services.processor.youtube_processor import YouTubeProcessor + + processor = YouTubeProcessor() + assert processor._format_timestamp(3661) == "[1:01:01]" + assert processor._format_timestamp(7200) == "[2:00:00]" + + def test_format_timestamp_zero(self): + """Format zero seconds as [0:00]""" + from ragitect.services.processor.youtube_processor import YouTubeProcessor + + processor = YouTubeProcessor() + assert processor._format_timestamp(0) == "[0:00]" + + +class TestLanguagePreference: + """Test multi-language support and preference (AC5)""" + + async def test_english_transcript_preferred(self): + """English transcript should be preferred if available""" + from ragitect.services.processor.youtube_processor import YouTubeProcessor + + processor = YouTubeProcessor() + + mock_transcript = [{"text": "English content", "start": 0.0, "duration": 1.0}] + + with patch.object(processor, "_extract_video_id", return_value="test123abcX"): + with patch.object( + processor, "_get_transcript", return_value=mock_transcript + ): + with patch.object( + processor, "_get_transcript_language", return_value="en" + ): + result = await processor.process( + "https://www.youtube.com/watch?v=test123abcX" + ) + + # Should indicate English language + assert "English" in result or "en" in result.lower() + + async def test_fallback_to_first_available_language(self): + """If English unavailable, use first available language""" + from ragitect.services.processor.youtube_processor import YouTubeProcessor + + processor = YouTubeProcessor() + + mock_transcript = [ + {"text": "Contenido en español", "start": 0.0, "duration": 1.0} + ] + + with patch.object(processor, "_extract_video_id", return_value="test123abcX"): + with patch.object( + processor, "_get_transcript", return_value=mock_transcript + ): + with patch.object( + processor, "_get_transcript_language", return_value="es" + ): + result = await processor.process( + "https://www.youtube.com/watch?v=test123abcX" + ) + + # Should work and include the content + assert "Contenido en español" in result + # Should indicate the language used + assert "Spanish" in result or "es" in result.lower() + + async def test_markdown_includes_language_metadata(self): + """Markdown output should include detected language as metadata""" + from ragitect.services.processor.youtube_processor import YouTubeProcessor + + processor = YouTubeProcessor() + + mock_transcript = [{"text": "Content", "start": 0.0, "duration": 1.0}] + + with patch.object(processor, "_extract_video_id", return_value="test123abcX"): + with patch.object( + processor, "_get_transcript", return_value=mock_transcript + ): + with patch.object( + processor, "_get_transcript_language", return_value="fr" + ): + result = await processor.process( + "https://www.youtube.com/watch?v=test123abcX" + ) + + # Should include language info + assert "Language" in result or "language" in result + + +class TestErrorHandling: + """Test error handling for various failure scenarios (AC4)""" + + async def test_transcripts_disabled_raises_error(self): + """Transcripts disabled by uploader should raise TranscriptUnavailableError""" + from ragitect.services.processor.youtube_processor import ( + YouTubeProcessor, + TranscriptUnavailableError, + ) + from youtube_transcript_api._errors import TranscriptsDisabled + + processor = YouTubeProcessor() + + with patch.object(processor, "_extract_video_id", return_value="test123abcX"): + with patch.object( + processor, "_get_transcript", side_effect=TranscriptsDisabled("videoId") + ): + with pytest.raises(TranscriptUnavailableError) as exc_info: + await processor.process( + "https://www.youtube.com/watch?v=test123abcX" + ) + + assert "disabled" in str(exc_info.value).lower() + assert "test123abcX" in str(exc_info.value) + + async def test_no_transcript_found_raises_error(self): + """No transcript available should raise TranscriptUnavailableError""" + from ragitect.services.processor.youtube_processor import ( + YouTubeProcessor, + TranscriptUnavailableError, + ) + from youtube_transcript_api._errors import NoTranscriptFound + + processor = YouTubeProcessor() + + mock_exception = NoTranscriptFound("videoId", ["en"], "Requested") + + with patch.object(processor, "_extract_video_id", return_value="test123abcX"): + with patch.object(processor, "_get_transcript", side_effect=mock_exception): + with pytest.raises(TranscriptUnavailableError) as exc_info: + await processor.process( + "https://www.youtube.com/watch?v=test123abcX" + ) + + assert ( + "not found" in str(exc_info.value).lower() + or "no transcript" in str(exc_info.value).lower() + ) + + async def test_video_unavailable_raises_invalid_url_error(self): + """Video unavailable should raise InvalidYouTubeURLError""" + from ragitect.services.processor.youtube_processor import ( + YouTubeProcessor, + InvalidYouTubeURLError, + ) + from youtube_transcript_api._errors import VideoUnavailable + + processor = YouTubeProcessor() + + with patch.object(processor, "_extract_video_id", return_value="test123abcX"): + with patch.object( + processor, "_get_transcript", side_effect=VideoUnavailable("videoId") + ): + with pytest.raises(InvalidYouTubeURLError) as exc_info: + await processor.process( + "https://www.youtube.com/watch?v=test123abcX" + ) + + assert ( + "test123abcX" in str(exc_info.value) + or "unavailable" in str(exc_info.value).lower() + ) + + async def test_exception_messages_contain_url(self): + """All exception messages should include the URL for debugging""" + from ragitect.services.processor.youtube_processor import ( + YouTubeProcessor, + TranscriptUnavailableError, + ) + from youtube_transcript_api._errors import TranscriptsDisabled + + processor = YouTubeProcessor() + test_video_id = "debugTestId" + + with patch.object(processor, "_extract_video_id", return_value=test_video_id): + with patch.object( + processor, + "_get_transcript", + side_effect=TranscriptsDisabled(test_video_id), + ): + with pytest.raises(TranscriptUnavailableError) as exc_info: + await processor.process( + f"https://www.youtube.com/watch?v={test_video_id}" + ) + + # Video ID or URL should be in error message for debugging + assert test_video_id in str(exc_info.value) + + +class TestExceptions: + """Test custom exception classes exist and are properly defined""" + + def test_invalid_youtube_url_error_exists(self): + """InvalidYouTubeURLError exception class should exist""" + from ragitect.services.processor.youtube_processor import InvalidYouTubeURLError + + error = InvalidYouTubeURLError("Test error message") + assert isinstance(error, Exception) + assert str(error) == "Test error message" + + def test_transcript_unavailable_error_exists(self): + """TranscriptUnavailableError exception class should exist""" + from ragitect.services.processor.youtube_processor import ( + TranscriptUnavailableError, + ) + + error = TranscriptUnavailableError("Test error message") + assert isinstance(error, Exception) + assert str(error) == "Test error message" + + +@pytest.mark.integration +class TestYouTubeProcessorIntegration: + """Integration tests with real YouTube API (require network access) (AC6)""" + + async def test_process_real_video_with_captions(self): + """Integration test: fetch real YouTube transcript""" + from ragitect.services.processor.youtube_processor import YouTubeProcessor + + processor = YouTubeProcessor() + # Use a well-known video with captions (TED talk or similar) + # Rick Astley - Never Gonna Give You Up has captions + url = "https://www.youtube.com/watch?v=dQw4w9WgXcQ" + + markdown = await processor.process(url) + + # Verify substantial content + assert len(markdown) > 100 + # Verify has heading + assert "#" in markdown + # Verify has timestamps + assert "[" in markdown and "]" in markdown + + async def test_real_video_timestamps_preserved(self): + """Integration test: verify timestamps are in correct format""" + from ragitect.services.processor.youtube_processor import YouTubeProcessor + + processor = YouTubeProcessor() + url = "https://www.youtube.com/watch?v=dQw4w9WgXcQ" + + markdown = await processor.process(url) + + # Check for timestamp pattern [M:SS] or [MM:SS] or [H:MM:SS] + import re + + timestamp_pattern = r"\[\d{1,2}:\d{2}(?::\d{2})?\]" + timestamps = re.findall(timestamp_pattern, markdown) + assert len(timestamps) > 0, ( + "Should have timestamps in [M:SS] or [H:MM:SS] format" + ) + + async def test_real_video_includes_metadata(self): + """Integration test: verify markdown includes video metadata""" + from ragitect.services.processor.youtube_processor import YouTubeProcessor + + processor = YouTubeProcessor() + url = "https://www.youtube.com/watch?v=dQw4w9WgXcQ" + + markdown = await processor.process(url) + + # Should include video URL or ID + assert "dQw4w9WgXcQ" in markdown or "youtube.com" in markdown.lower() + # Should include language info + assert "Language" in markdown or "language" in markdown.lower() + + async def test_markdown_compatible_with_chunking(self): + """Integration test: verify markdown works with DocumentProcessor chunking""" + from ragitect.services.processor.youtube_processor import YouTubeProcessor + from ragitect.services.document_processor import split_markdown_document + + processor = YouTubeProcessor() + url = "https://www.youtube.com/watch?v=dQw4w9WgXcQ" + + # Fetch and extract + markdown = await processor.process(url) + + # Test with existing chunker + chunks = split_markdown_document( + raw_text=markdown, + chunk_size=512, + overlap=50, + ) + + # Verify chunking works + assert len(chunks) > 0, "Should produce at least one chunk" + assert all(isinstance(chunk, str) for chunk in chunks), ( + "Chunks should be strings" + ) + assert all(len(chunk) > 0 for chunk in chunks), "Each chunk should have content" + + async def test_youtu_be_short_url_works(self): + """Integration test: youtu.be short URL format works""" + from ragitect.services.processor.youtube_processor import YouTubeProcessor + + processor = YouTubeProcessor() + # Same video, short URL format + url = "https://youtu.be/dQw4w9WgXcQ" + + markdown = await processor.process(url) + + assert len(markdown) > 100 + assert "#" in markdown diff --git a/uv.lock b/uv.lock index 23b88ec..a454059 100644 --- a/uv.lock +++ b/uv.lock @@ -562,6 +562,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/87/22/f020c047ae1346613db9322638186468238bcfa8849b4668a22b97faad65/dateparser-1.2.2-py3-none-any.whl", hash = "sha256:5a5d7211a09013499867547023a2a0c91d5a27d15dd4dbcea676ea9fe66f2482", size = 315453, upload-time = "2025-06-26T09:29:21.412Z" }, ] +[[package]] +name = "defusedxml" +version = "0.7.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/0f/d5/c66da9b79e5bdb124974bfe172b4daf3c984ebd9c2a06e2b8a4dc7331c72/defusedxml-0.7.1.tar.gz", hash = "sha256:1bb3032db185915b62d7c6209c5a8792be6a32ab2fedacc84e01b52c51aa3e69", size = 75520, upload-time = "2021-03-08T10:59:26.269Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/07/6c/aa3f2f849e01cb6a001cd8554a88d4c77c5c1a31c95bdf1cf9301e6d9ef4/defusedxml-0.7.1-py2.py3-none-any.whl", hash = "sha256:a352e7e428770286cc899e2542b6cdaedb2b4953ff269a210103ec58f6198a61", size = 25604, upload-time = "2021-03-08T10:59:24.45Z" }, +] + [[package]] name = "dill" version = "0.4.0" @@ -3415,6 +3424,7 @@ dependencies = [ { name = "trafilatura" }, { name = "transformers" }, { name = "uvicorn" }, + { name = "youtube-transcript-api" }, ] [package.dev-dependencies] @@ -3458,6 +3468,7 @@ requires-dist = [ { name = "trafilatura", specifier = ">=1.7.0" }, { name = "transformers", specifier = ">=4.57.1" }, { name = "uvicorn", specifier = ">=0.38.0" }, + { name = "youtube-transcript-api", specifier = ">=0.6.3" }, ] [package.metadata.requires-dev] @@ -4699,6 +4710,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/73/ae/b48f95715333080afb75a4504487cbe142cae1268afc482d06692d605ae6/yarl-1.22.0-py3-none-any.whl", hash = "sha256:1380560bdba02b6b6c90de54133c81c9f2a453dee9912fe58c1dcced1edb7cff", size = 46814, upload-time = "2025-10-06T14:12:53.872Z" }, ] +[[package]] +name = "youtube-transcript-api" +version = "1.2.4" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "defusedxml" }, + { name = "requests" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/60/43/4104185a2eaa839daa693b30e15c37e7e58795e8e09ec414f22b3db54bec/youtube_transcript_api-1.2.4.tar.gz", hash = "sha256:b72d0e96a335df599d67cee51d49e143cff4f45b84bcafc202ff51291603ddcd", size = 469839, upload-time = "2026-01-29T09:09:17.088Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/be/95/129ea37efd6cd6ed00f62baae6543345c677810b8a3bf0026756e1d3cf3c/youtube_transcript_api-1.2.4-py3-none-any.whl", hash = "sha256:03878759356da5caf5edac77431780b91448fb3d8c21d4496015bdc8a7bc43ff", size = 485227, upload-time = "2026-01-29T09:09:15.427Z" }, +] + [[package]] name = "zipp" version = "3.23.0"