Skip to content

feat: Browser History 起点の YouTube watch event データソース実装#165

Merged
endo-ly merged 14 commits into
mainfrom
feat/youtube-watch-events
Apr 22, 2026
Merged

feat: Browser History 起点の YouTube watch event データソース実装#165
endo-ly merged 14 commits into
mainfrom
feat/youtube-watch-events

Conversation

@endo-ly
Copy link
Copy Markdown
Owner

@endo-ly endo-ly commented Apr 21, 2026

概要

Browser History の page view から YouTube watch event を抽出し、events/youtube/watch_eventsmaster/youtube/{videos,channels} を生成するパイプラインを実装。あわせて backend に YouTube 専用 REST / MCP Tool を再導入し、視聴履歴・統計・ランキングを利用可能にした。

設計方針

  • Browser History ingest は原本保存を維持し、YouTube はその派生データセットとして追加
  • YouTube metadata 解決は既存の YouTubeAPIClienttransform_video_info / transform_channel_info を再利用
  • watch_events は完成済みレコードとして保存し、videos / channels は正規辞書として別保持
  • Backend の公開仕様は youtube.md を source of truth

変更内容

Pipelines (データ収集)

変更 内容
browser_history/youtube_extraction.py 新規: URL正規化、video_id抽出、content_type判定、watch event生成
browser_history/youtube_metadata.py 新規: YouTube Data API連携、video/channel master構築、watch eventエンリッチ
browser_history/ingest_pipeline.py 変更: YouTube抽出・metadata解決・master保存を統合

Backend (データ提供API)

変更 内容
youtube_queries.py 変更: watch_historywatch_events、JOIN排除、get_top_videos 追加
youtube_repository.py 変更: 4メソッド (get_watch_events, get_watching_stats, get_top_videos, get_top_channels)
domain/tools/youtube/stats.py 変更: 4ツール (get_youtube_watch_events, get_youtube_watching_stats, get_youtube_top_videos, get_youtube_top_channels)
api/youtube.py 変更: /watch-events, /stats/top-videos 追加、旧エンドポイント置換
api/schemas/data.py 変更: WatchEventResponse, TopVideoResponse 新設、既存スキーマ更新
usecases/tools/factory.py 変更: YouTube 4ツールをregistryに復帰

テスト

対象 テスト数 内容
Pipeline: YouTube抽出 13 URL正規化、video_id抽出、content_type判定、月グルーピング
Pipeline: Metadata解決 7 API連携、master構築、enrichment、失敗時保護
Backend: Queries 23 watch_events query、stats集計、top_videos/channels、partition paths
Backend: Repository 6 4メソッド契約、後方互換エイリアス
Backend: Domain Tools 28 4ツールのname/description/schema/execute
Backend: API統合 7 REST API フィールド検証、MCP registry/call_tool
合計 84 581テスト全通過、既存テスト回帰なし

参照

コミット分割

  1. feat: add youtube watch event extraction from browser history
  2. refactor: align backend youtube queries with watch events
  3. feat: resolve youtube metadata and persist masters
  4. feat: expose youtube watch event tools via api and mcp
  5. docs: align youtube implementation docs and config
  6. style: fix lint and format issues in youtube implementation

Summary by CodeRabbit

  • 新機能
    • YouTube視聴イベントAPI(watch-events/watching/top-videos/top-channels)と4つのMCPツールを追加
    • ブラウザ履歴から同期ID単位でYouTube視聴イベントを抽出・集約・保存するワークフローを追加
    • 動画/チャンネルのYouTube API連携によるメタデータ取得・マスター保存機能を追加
  • ドキュメント
    • バックエンドAPI設計とパイプラインドキュメントを新しいデータ契約で更新
  • テスト
    • API、パイプライン、ストレージ周りの単体・統合テストを多数追加・更新

@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 21, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

ブラウザ履歴からYouTubeのwatchイベントを抽出・保存し、YouTube Data APIでメタデータを解決してマスターを生成するパイプラインと、それを公開するバックエンドAPIおよびMCPツール群を追加した。sync_idベースのイベントワークフローとS3/Parquetストレージが導入されている。

Changes

Cohort / File(s) Summary
ドキュメント
docs/20.egograph/backend/architecture.md, docs/20.egograph/pipelines/youtube.md
YouTube 記述を具体的Parquetパスへ更新。APIエンドポイントとMCPツール一覧を追加。パイプライン仕様をsync_idベースに変更。
バックエンド API スキーマ/登録
egograph/backend/api/__init__.py, egograph/backend/api/schemas/__init__.py, egograph/backend/api/schemas/data.py
YouTubeルータをエクスポート。レスポンススキーマをWatchHistory→WatchEvent系に置換、TopVideoResponseを追加。型とフィールドをイベント指向に変更。
バックエンド API 実装
egograph/backend/api/youtube.py, egograph/backend/main.py
/watch-events/stats/* エンドポイント実装・登録。リポジトリ呼び出しを watch_events/top_videos 等へ変更。
ドメインツール
egograph/backend/domain/tools/youtube/stats.py, egograph/backend/usecases/tools/__init__.py, egograph/backend/usecases/tools/factory.py
ツール名をYouTubeプレフィックスへ変更・追加(watch_events/watching/top_videos/top_channels)し、ToolRegistryへ登録。
DBクエリ / リポジトリ
egograph/backend/infrastructure/database/youtube_queries.py, egograph/backend/infrastructure/repositories/youtube_repository.py
watch_history→watch_eventsパスへ移行。CTEでmasterを結合する構造にリファクタ。集計を秒→イベント数ベースに変更。get_top_videos追加。
テスト(バックエンド)
egograph/backend/tests/... (複数)
テストフィクスチャ/ユニット/統合をwatch_events・channelsスキーマと新ツール名に合わせて更新。新規統合テストを追加。
YouTube パイプライン(新規パッケージ)
egograph/pipelines/sources/youtube/... (api_client.py, canonical.py, config.py, extraction.py, metadata.py, pipeline.py, storage.py, __init__.py)
ブラウザ履歴→watch events 抽出、YouTube API クライアント、マスター生成、S3/Parquet保存、同期状態管理を実装。
ブラウザ履歴統合 / サービス
egograph/pipelines/sources/browser_history/pipeline.py, egograph/pipelines/sources/browser_history/transform.py, egograph/pipelines/sources/browser_history/__init__.py, egograph/pipelines/api/browser_history.py, egograph/pipelines/service.py
ページビューにsync_idを追加。コンパクション後にYouTube ingestイベントをenqueueする機能とAPIレスポンスのyoutube_run_id/youtube_errorを追加。
既存コード移設・共通化
egograph/pipelines/sources/google_activity/*
旧YouTubeクライアント/変換をyoutubeパッケージへ移動し、transform関数呼び出しを共通化。古いyoutube_api.pyを削除。
ワークフロー登録
egograph/pipelines/workflows/registry.py
イベント駆動の youtube_ingest_workflow をワークフロー群へ追加(3600sタイムアウト)。
パイプラインテスト
egograph/pipelines/tests/unit/..., egograph/pipelines/tests/integration/...
抽出・メタデータ解決・保存・ワークフロー挙動のユニット/統合テストを追加・更新。
インポート整理(テスト群)
複数テストファイル (egograph/backend/tests/integration/test_mcp_endpoint.py, egograph/pipelines/tests/integration/github/*, egograph/pipelines/tests/unit/* 等)
import順・空行整理と一部import移動。テストロジックは基本不変。

Sequence Diagram(s)

sequenceDiagram
    participant Client as Client
    participant API as BrowserHistory API<br/>/ingest
    participant Service as PipelineService
    participant Orchestrator as Workflow Orchestrator
    participant YouTubeWF as YouTube Ingest<br/>(in-process)
    participant Storage as YouTubeStorage (S3/Parquet)
    participant YouTubeAPI as YouTube Data API
    participant Repo as YouTube Repository

    Client->>API: POST compaction result
    API->>Service: enqueue_youtube_ingest(sync_id, target_months)
    Service->>Orchestrator: enqueue_event_run(payload)
    Orchestrator->>YouTubeWF: run_youtube_ingest(run)
    YouTubeWF->>Storage: load_browser_history_page_views(sync_id, months)
    YouTubeWF->>YouTubeWF: extract_youtube_watch_events(page_views)
    YouTubeWF->>YouTubeAPI: get_videos(video_ids)
    YouTubeWF->>YouTubeAPI: get_channels(channel_ids)
    YouTubeWF->>Storage: save_watch_events(year,month,sync_id)
    YouTubeWF->>Storage: save_video_master()
    YouTubeWF->>Storage: save_channel_master()
    YouTubeWF->>Storage: mark_sync_processed(sync_id)
    Client->>Repo: GET /v1/data/youtube/watch-events
    Repo->>Storage: read watch_events/master parquet
    Repo-->>Client: return WatchEventResponse[]
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

🐰 新しい道ができたよ、sync_idを携えて
履歴からイベント、APIで花咲かす
マスターも保存して並べたら
エンドポイントが4つ、ツールも4つニャ!
コードが踊る、データが歌う 🎶

🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed タイトルは Browser History からの YouTube watch event データソース実装という PR の主要な目的を簡潔に表現しており、変更内容と一致している。
Docstring Coverage ✅ Passed Docstring coverage is 87.50% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/youtube-watch-events

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (4)
egograph/backend/tests/unit/repositories/test_youtube_repository.py (1)

33-47: ⚠️ Potential issue | 🟠 Major

patch 先がズレてるから Repository の接続を差し替えられてない

YouTubeRepository は line 13 で from backend.infrastructure.database import DuckDBConnection として直接 import してるから、backend.infrastructure.database.DuckDBConnection を patch してもこのモジュール内の参照は差し替わらない。実接続を使っちゃうから、実行環境に依存して不安定になるやつだぴょん。

修正案
         with patch(
-            "backend.infrastructure.database.DuckDBConnection"
+            "backend.infrastructure.repositories.youtube_repository.DuckDBConnection"
         ) as mock_conn_class:

同じ置換をこのファイル内の 4 か所に入れて(lines 34, 60, 89, 115)。

Also applies to: 59-74, 88-102, 114-128

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@egograph/backend/tests/unit/repositories/test_youtube_repository.py` around
lines 33 - 47, The test is patching
backend.infrastructure.database.DuckDBConnection, but YouTubeRepository directly
imports DuckDBConnection into its module, so the patch doesn't replace the
reference used by YouTubeRepository; update the tests to patch the
DuckDBConnection symbol in the YouTubeRepository module instead (patch
"backend.repositories.youtube_repository.DuckDBConnection") wherever you
currently patch the infrastructure path (the four spots around the existing
patches in test_youtube_repository.py), so the MagicMock replaces the connection
used by YouTubeRepository when instantiating and calling get_watch_events.
egograph/backend/api/youtube.py (1)

36-78: ⚠️ Potential issue | 🟠 Major

watched_at_utc をレスポンススキーマに合わせてキャスト

実装では parquet 列が TIMESTAMP 型で、クエリも w.watched_at_utc をそのまま返す(WHERE w.watched_at_utc::DATE::DATE キャストから明らか)。データが DuckDB を経由して pandas.DataFrame に変換される際、datetime オブジェクトになり、.to_dict(orient="records") で dict に保たれる。一方で WatchEventResponse スキーマは watched_at_utc: str を要求している。

テストはモックで文字列を返すから通るが、本番の DuckDB 結果は実際の datetime オブジェクトなので、Pydantic の暗黙的な str() 変換に頼ることになり、脆い。

SQL で ISO 文字列に明示的にキャストするか、エンドポイント側で変換するかして、スキーマとの契約を守って。

🐰 修正案(エンドポイント側で吸収)
-from datetime import date
+from datetime import date, datetime
@@
     repository = YouTubeRepository(config.r2)
-    return repository.get_watch_events(start, end, validated_limit)
+    watch_events = repository.get_watch_events(start, end, validated_limit)
+    for event in watch_events:
+        watched_at = event.get("watched_at_utc")
+        if isinstance(watched_at, datetime):
+            event["watched_at_utc"] = watched_at.isoformat()
+    return watch_events
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@egograph/backend/api/youtube.py` around lines 36 - 78, The endpoint currently
returns datetime objects for watched_at_utc but WatchEventResponse expects a
str; after calling YouTubeRepository.get_watch_events in
get_watch_events_endpoint, iterate the returned records and convert each
record['watched_at_utc'] (if it's a datetime) to an ISO-8601 string (e.g., using
dt.isoformat() and normalizing naive datetimes to UTC) before returning; update
get_watch_events_endpoint to perform this conversion (referencing
get_watch_events_endpoint, YouTubeRepository.get_watch_events, and the
watched_at_utc/WatchEventResponse fields) so the response always matches the
schema.
egograph/backend/infrastructure/database/youtube_queries.py (2)

168-195: ⚠️ Potential issue | 🔴 Critical

LIMIT をSQL文字列に直埋めしないでね

limit だけプレースホルダじゃなくf-stringで入っているから、上位のvalidateを通らない呼び出しでSQL注入・SQL破壊が起きる。get_top_videos と同じ形でパラメータ化しよう。

🐰 修正案
     query = """
         SELECT
             w.watch_event_id,
             w.watched_at_utc,
             w.video_id,
             w.video_url,
             w.video_title,
             w.channel_id,
             w.channel_name,
             w.content_type
         FROM read_parquet(?) w
         WHERE w.watched_at_utc::DATE BETWEEN ? AND ?
         ORDER BY w.watched_at_utc DESC
     """
+    query_params: list[Any] = [partition_paths, params.start_date, params.end_date]
     if limit is not None:
-        query += f"\n        LIMIT {limit}"
+        query += "\n        LIMIT ?"
+        query_params.append(limit)
 
     logger.debug(
         "Executing get_watch_events: %s to %s, limit=%s",
         params.start_date,
         params.end_date,
@@
     return execute_query(
         params.conn,
         query,
-        [partition_paths, params.start_date, params.end_date],
+        query_params,
     )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@egograph/backend/infrastructure/database/youtube_queries.py` around lines 168
- 195, The SQL string in get_watch_events appends the limit via an f-string
(variable limit) which risks SQL injection; change this to use a parameterized
placeholder like the other params and pass the limit as an additional parameter
to execute_query (e.g., append limit to the parameters list only when limit is
not None), validating/casting limit to an integer beforehand; update the query
construction (variable query) to include "LIMIT ?" instead of f-string and
ensure execute_query(params.conn, query, ...) receives the matching parameter
order.

349-358: ⚠️ Potential issue | 🟠 Major

NULL channel_id をトップチャンネルに混ぜないでね

metadata未解決のイベントは channel_id / channel_name がNULLになり得るから、今のGROUP BYだと全unknownが1つの巨大な「チャンネル」として上位に出る。トップチャンネルではNULLを除外するか、別枠で明示して集計しよう。

🐰 修正案
     query = """
         SELECT
             w.channel_id,
-            w.channel_name,
+            MAX(w.channel_name) as channel_name,
             COUNT(*) as watch_event_count,
             COUNT(DISTINCT w.video_id) as unique_video_count
         FROM read_parquet(?) w
-        WHERE w.watched_at_utc::DATE BETWEEN ? AND ?
-        GROUP BY w.channel_id, w.channel_name
+        WHERE w.watched_at_utc::DATE BETWEEN ? AND ?
+          AND w.channel_id IS NOT NULL
+        GROUP BY w.channel_id
         ORDER BY watch_event_count DESC
         LIMIT ?
     """
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@egograph/backend/infrastructure/database/youtube_queries.py` around lines 349
- 358, The SQL builds a top-channels aggregation into the variable `query` but
currently groups NULL `w.channel_id`/`w.channel_name` into one large "unknown"
channel; update the query string so NULL channel_id entries are handled
explicitly — either exclude them by adding a filter like `AND w.channel_id IS
NOT NULL` to the WHERE clause, or add a CASE/COALESCE to group NULLs into an
explicit "Unknown" bucket (e.g., CASE WHEN w.channel_id IS NULL THEN 'unknown'
END) so the top-channel results don't include a single NULL-aggregated channel;
change the `query` variable accordingly referencing
`w.channel_id`/`w.channel_name`.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@docs/20.egograph/backend/architecture.md`:
- Line 92: The YouTube data source row was added but the REST API 一覧 and 登録ツール一覧
sections in architecture.md were not updated; edit the same file to add matching
public API endpoints and tool entries for YouTube: include routes/names like
`events/youtube/watch_events/` (watch-events), `master/youtube/videos/`
(top-videos), `master/youtube/channels/` (top-channels) and the
viewing-statistics registration/tool (視聴統計ツール) using the actual route names and
tool names used elsewhere in the project, updating the REST API 一覧 and 登録ツール一覧
sections so the architecture.md reflects the publicly exposed endpoints and
tools for YouTube.

In `@egograph/backend/infrastructure/database/youtube_queries.py`:
- Around line 296-307: The query groups by too many fields causing the same
video to appear multiple times; modify the SQL (the query variable in
youtube_queries.py) to GROUP BY only w.video_id and change video_title,
channel_id, channel_name to aggregated expressions (e.g., MAX(w.video_title) AS
video_title, MAX(w.channel_id) AS channel_id, MAX(w.channel_name) AS
channel_name) while keeping COUNT(*) AS watch_event_count and ORDER BY
watch_event_count DESC LIMIT ? so ranking is keyed by video_id.

In `@egograph/backend/tests/conftest.py`:
- Around line 150-153: Docstring for the property test_watches_parquet_path uses
full-width (全角) parentheses which triggers Ruff RUF002; update the docstring in
the test_watches_parquet_path property to use ASCII parentheses
(test_watch_events_parquet_path へのエイリアス) instead of 全角括弧 so the linter stops
flagging it; locate the property definition named test_watches_parquet_path and
replace the full-width brackets in its triple-quoted string with standard ASCII
parentheses, keeping the rest of the text unchanged and ensuring it still
references test_watch_events_parquet_path.

In `@egograph/pipelines/sources/browser_history/ingest_pipeline.py`:
- Around line 99-102: resolved から得た video_master/channel_master を
save_youtube_masters(storage, video_master, channel_master)
に渡しているが戻り値を無視しているためマスター保存失敗を握りつぶしている。save_youtube_masters
の戻り値(成功/失敗)を受け取り、失敗した場合はエラーをログ出力して後続の youtube_events 保存や state
更新を中止(または明示的にリトライ/例外送出)するように修正してください。対象箇所は resolved のブロック内と、その後に続く
youtube_events/state 保存処理(youtube_events, video_master, channel_master,
save_youtube_masters を参照)です。

In `@egograph/pipelines/sources/browser_history/youtube_extraction.py`:
- Around line 135-149: The watch_event_id is currently generated with
uuid.uuid4() causing duplicate imports to create new events; change it to a
deterministic ID derived from the row["page_view_id"] and video_id (e.g., use a
stable hash or UUID5) so re-runs/backfills produce the same watch_event_id.
Update the construction of the "watch_event_id" field (referencing
_WATCH_EVENT_PREFIX and the events dict where watch_event_id is set) to compute
a stable identifier from page_view_id + video_id and prefix it with
_WATCH_EVENT_PREFIX instead of using uuid.uuid4().

In `@egograph/pipelines/tests/unit/browser_history/test_youtube_metadata.py`:
- Around line 253-277: The test named
test_update_youtube_ingest_state_only_after_all_outputs_saved currently only
calls resolve_youtube_metadata and asserts its return values, so it doesn't
verify ingest state updates or behavior on save failures; either rename it to
reflect it's a metadata-resolution unit test or convert it into an
integration-style test that calls run_browser_history_pipeline with a mocked
storage/client to assert state is updated only after both video_master and
channel_master (and watch_events) are persisted and that state remains unchanged
when any of those saves fail; use the existing mocks
(mock_client/get_videos/get_channels) and inject failing behavior for the
persistence layer to assert the two scenarios.

---

Outside diff comments:
In `@egograph/backend/api/youtube.py`:
- Around line 36-78: The endpoint currently returns datetime objects for
watched_at_utc but WatchEventResponse expects a str; after calling
YouTubeRepository.get_watch_events in get_watch_events_endpoint, iterate the
returned records and convert each record['watched_at_utc'] (if it's a datetime)
to an ISO-8601 string (e.g., using dt.isoformat() and normalizing naive
datetimes to UTC) before returning; update get_watch_events_endpoint to perform
this conversion (referencing get_watch_events_endpoint,
YouTubeRepository.get_watch_events, and the watched_at_utc/WatchEventResponse
fields) so the response always matches the schema.

In `@egograph/backend/infrastructure/database/youtube_queries.py`:
- Around line 168-195: The SQL string in get_watch_events appends the limit via
an f-string (variable limit) which risks SQL injection; change this to use a
parameterized placeholder like the other params and pass the limit as an
additional parameter to execute_query (e.g., append limit to the parameters list
only when limit is not None), validating/casting limit to an integer beforehand;
update the query construction (variable query) to include "LIMIT ?" instead of
f-string and ensure execute_query(params.conn, query, ...) receives the matching
parameter order.
- Around line 349-358: The SQL builds a top-channels aggregation into the
variable `query` but currently groups NULL `w.channel_id`/`w.channel_name` into
one large "unknown" channel; update the query string so NULL channel_id entries
are handled explicitly — either exclude them by adding a filter like `AND
w.channel_id IS NOT NULL` to the WHERE clause, or add a CASE/COALESCE to group
NULLs into an explicit "Unknown" bucket (e.g., CASE WHEN w.channel_id IS NULL
THEN 'unknown' END) so the top-channel results don't include a single
NULL-aggregated channel; change the `query` variable accordingly referencing
`w.channel_id`/`w.channel_name`.

In `@egograph/backend/tests/unit/repositories/test_youtube_repository.py`:
- Around line 33-47: The test is patching
backend.infrastructure.database.DuckDBConnection, but YouTubeRepository directly
imports DuckDBConnection into its module, so the patch doesn't replace the
reference used by YouTubeRepository; update the tests to patch the
DuckDBConnection symbol in the YouTubeRepository module instead (patch
"backend.repositories.youtube_repository.DuckDBConnection") wherever you
currently patch the infrastructure path (the four spots around the existing
patches in test_youtube_repository.py), so the MagicMock replaces the connection
used by YouTubeRepository when instantiating and calling get_watch_events.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: d57138fc-6354-4c1c-a126-464fc956bab3

📥 Commits

Reviewing files that changed from the base of the PR and between 1f42bed and ddc685a.

📒 Files selected for processing (23)
  • docs/20.egograph/backend/architecture.md
  • docs/20.egograph/pipelines/youtube.md
  • egograph/backend/api/__init__.py
  • egograph/backend/api/schemas/__init__.py
  • egograph/backend/api/schemas/data.py
  • egograph/backend/api/youtube.py
  • egograph/backend/domain/tools/youtube/stats.py
  • egograph/backend/infrastructure/database/youtube_queries.py
  • egograph/backend/infrastructure/repositories/youtube_repository.py
  • egograph/backend/main.py
  • egograph/backend/tests/conftest.py
  • egograph/backend/tests/domain/tools/youtube/test_stats.py
  • egograph/backend/tests/integration/test_youtube_api.py
  • egograph/backend/tests/unit/repositories/test_youtube_queries.py
  • egograph/backend/tests/unit/repositories/test_youtube_repository.py
  • egograph/backend/usecases/tools/__init__.py
  • egograph/backend/usecases/tools/factory.py
  • egograph/pipelines/sources/browser_history/ingest_pipeline.py
  • egograph/pipelines/sources/browser_history/youtube_extraction.py
  • egograph/pipelines/sources/browser_history/youtube_metadata.py
  • egograph/pipelines/tests/unit/browser_history/test_youtube_extraction.py
  • egograph/pipelines/tests/unit/browser_history/test_youtube_metadata.py
  • egograph/pipelines/tests/unit/test_settings.py

Comment thread docs/20.egograph/backend/architecture.md Outdated
Comment thread egograph/backend/infrastructure/database/youtube_queries.py Outdated
Comment thread egograph/backend/tests/conftest.py Outdated
Comment on lines +99 to +102
if resolved is not None:
youtube_events, video_master, channel_master = resolved
save_youtube_masters(storage, video_master, channel_master)
else:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

master保存失敗を握りつぶさないで

save_youtube_masters(...) は成否を返すのに、今は戻り値を捨ててる。video/channel master の保存だけ失敗しても watch events と state は進むから、派生データが片落ちしても気づけないぴょん。

🐰 修正案
             resolved = resolve_youtube_metadata(youtube_events, youtube_api_client)
             if resolved is not None:
                 youtube_events, video_master, channel_master = resolved
-                save_youtube_masters(storage, video_master, channel_master)
+                if not save_youtube_masters(storage, video_master, channel_master):
+                    logger.warning(
+                        "Failed to save YouTube master parquet; "
+                        "watch events will still be saved"
+                    )
             else:
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if resolved is not None:
youtube_events, video_master, channel_master = resolved
save_youtube_masters(storage, video_master, channel_master)
else:
if resolved is not None:
youtube_events, video_master, channel_master = resolved
if not save_youtube_masters(storage, video_master, channel_master):
logger.warning(
"Failed to save YouTube master parquet; "
"watch events will still be saved"
)
else:
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@egograph/pipelines/sources/browser_history/ingest_pipeline.py` around lines
99 - 102, resolved から得た video_master/channel_master を
save_youtube_masters(storage, video_master, channel_master)
に渡しているが戻り値を無視しているためマスター保存失敗を握りつぶしている。save_youtube_masters
の戻り値(成功/失敗)を受け取り、失敗した場合はエラーをログ出力して後続の youtube_events 保存や state
更新を中止(または明示的にリトライ/例外送出)するように修正してください。対象箇所は resolved のブロック内と、その後に続く
youtube_events/state 保存処理(youtube_events, video_master, channel_master,
save_youtube_masters を参照)です。

Comment thread egograph/pipelines/sources/youtube/extraction.py
Comment thread egograph/pipelines/tests/unit/browser_history/test_youtube_metadata.py Outdated
- docs: add YouTube REST API endpoints and tools to architecture.md
- fix: GROUP BY video_id/channel_id only in top queries (prevent duplicate ranking rows)
- fix: check save_youtube_masters return value and log warning on failure
- fix: use deterministic uuid5 for watch_event_id (prevent duplicates on re-import)
- fix: remove backward-compat alias property from conftest.py
- fix: rename test to match actual behavior
- style: ruff format fixes in existing test files
@endo-ly
Copy link
Copy Markdown
Owner Author

endo-ly commented Apr 22, 2026

レビュー対応完了

対応したレビューコメント: 6件

対象ファイル 修正内容 ステータス
docs/20.egograph/backend/architecture.md REST API一覧とツール一覧にYouTube 4エンドポイント/ツールを追記 ✅ 修正済み
egograph/backend/infrastructure/database/youtube_queries.py get_top_videos/get_top_channels の GROUP BY を video_id/channel_id のみに変更し、表示フィールドは MAX() で集約(同名動画のランキング分裂を防止) ✅ 修正済み
egograph/backend/tests/conftest.py 後方互換エイリアス test_watches_parquet_path プロパティを削除(プロジェクト方針「後方互換は負債」に準拠) ✅ 修正済み
egograph/pipelines/sources/browser_history/ingest_pipeline.py save_youtube_masters の戻り値をチェックし、失敗時に warning ログを出力 ✅ 修正済み
egograph/pipelines/sources/browser_history/youtube_extraction.py uuid.uuid4()uuid.uuid5(NAMESPACE_URL, "browser_history:{source_event_id}:{video_id}") に変更(再実行時の重複イベント防止) ✅ 修正済み
egograph/pipelines/tests/unit/browser_history/test_youtube_metadata.py テスト名を test_resolve_youtube_metadata_returns_enriched_events_and_masters にリネーム(実態に合わせる) ✅ 修正済み

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🧹 Nitpick comments (1)
egograph/backend/infrastructure/database/youtube_queries.py (1)

182-196: LIMIT 扱いが get_top_videos/get_top_channels と不一致

get_watch_events だけ f-string で LIMIT {limit} を埋めてるけど、他 2 関数は LIMIT ? でパラメータ化してる。limit: int | None なので実害はない(型でガードされてる)けど、静的解析や将来のリファクタでの事故防止のため揃えておくのが吉ぴょん。

🐰 diff 案
     query = """
         SELECT
             w.watch_event_id,
             w.watched_at_utc,
             w.video_id,
             w.video_url,
             w.video_title,
             w.channel_id,
             w.channel_name,
             w.content_type
         FROM read_parquet(?) w
         WHERE w.watched_at_utc::DATE BETWEEN ? AND ?
         ORDER BY w.watched_at_utc DESC
     """
-    if limit is not None:
-        query += f"\n        LIMIT {limit}"
+    sql_params: list[Any] = [partition_paths, params.start_date, params.end_date]
+    if limit is not None:
+        query += "\n        LIMIT ?"
+        sql_params.append(limit)

     logger.debug(
         "Executing get_watch_events: %s to %s, limit=%s",
         params.start_date,
         params.end_date,
         limit,
     )

     return execute_query(
         params.conn,
         query,
-        [partition_paths, params.start_date, params.end_date],
+        sql_params,
     )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@egograph/backend/infrastructure/database/youtube_queries.py` around lines 182
- 196, get_watch_events currently injects limit via an f-string while
get_top_videos/get_top_channels use a parameterized "LIMIT ?"; change
get_watch_events to append "\n        LIMIT ?" when limit is not None and pass
limit as the final element in the execute_query parameters list (i.e., add limit
to the existing [partition_paths, params.start_date, params.end_date] array),
keeping the conditional so limit remains optional and matching the
parameterization style of get_top_videos/get_top_channels.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@egograph/backend/infrastructure/database/youtube_queries.py`:
- Around line 349-360: Several aggregation queries in youtube_queries.py
(including get_top_channels, get_watching_stats, get_top_videos) currently
include rows with NULL channel_id, causing NULL to be aggregated as a fake
channel; update each SQL query (e.g., the query string variable used in
get_top_channels shown above) to filter out NULLs by adding "WHERE w.channel_id
IS NOT NULL" (or add "AND w.channel_id IS NOT NULL" if a WHERE clause already
exists) so that COUNT(DISTINCT w.channel_id), MAX(w.channel_id) and other
aggregations exclude NULL channel_id rows and return correct results.

In `@egograph/pipelines/sources/browser_history/ingest_pipeline.py`:
- Around line 92-95: The comment beside the youtube_events handling uses a
full-width closing parenthesis which triggers Ruff RUF003; update the comment
text in the ingest_pipeline.py near the youtube_events block (around the
extract_youtube_watch_events(rows) / youtube_events check) to replace the
full-width bracket 「)」 with a half-width parenthesis “)”, e.g. change "API
クライアントが利用可能な場合)" to "API クライアントが利用可能な場合)".
- Around line 112-126: The loop that saves monthly YouTube watch events
(youtube_monthly from group_watch_events_by_month) currently only logs warnings
on individual save_parquet failures, causing last_sync_status="events_saved" to
incorrectly mark success; change this by collecting failed months (e.g.,
failed_youtube_months list) while iterating over youtube_monthly and on any
failure call logger.error and increment a metric/counter, then return that
failure info in the pipeline result (extend BrowserHistoryPipelineResult with
youtube_events_saved: bool or failed_youtube_months: List[Tuple[int,int]]), and
set last_sync_status to a distinct value like "events_saved_partial" when
failures exist so callers can observe partial failures rather than silently
succeeding.

---

Nitpick comments:
In `@egograph/backend/infrastructure/database/youtube_queries.py`:
- Around line 182-196: get_watch_events currently injects limit via an f-string
while get_top_videos/get_top_channels use a parameterized "LIMIT ?"; change
get_watch_events to append "\n        LIMIT ?" when limit is not None and pass
limit as the final element in the execute_query parameters list (i.e., add limit
to the existing [partition_paths, params.start_date, params.end_date] array),
keeping the conditional so limit remains optional and matching the
parameterization style of get_top_videos/get_top_channels.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 5b8c09b4-29fe-4687-b6e7-3b9971be534f

📥 Commits

Reviewing files that changed from the base of the PR and between ddc685a and 68167da.

📒 Files selected for processing (27)
  • docs/20.egograph/backend/architecture.md
  • egograph/backend/infrastructure/database/youtube_queries.py
  • egograph/backend/tests/conftest.py
  • egograph/backend/tests/integration/test_mcp_endpoint.py
  • egograph/pipelines/sources/browser_history/ingest_pipeline.py
  • egograph/pipelines/sources/browser_history/youtube_extraction.py
  • egograph/pipelines/tests/integration/github/test_compact.py
  • egograph/pipelines/tests/integration/github/test_enrichment.py
  • egograph/pipelines/tests/integration/github/test_incremental.py
  • egograph/pipelines/tests/integration/github/test_ingest.py
  • egograph/pipelines/tests/unit/browser_history/test_youtube_metadata.py
  • egograph/pipelines/tests/unit/github/test_collector.py
  • egograph/pipelines/tests/unit/github/test_pipeline.py
  • egograph/pipelines/tests/unit/google_activity/test_collector.py
  • egograph/pipelines/tests/unit/google_activity/test_config.py
  • egograph/pipelines/tests/unit/google_activity/test_main.py
  • egograph/pipelines/tests/unit/google_activity/test_pipeline.py
  • egograph/pipelines/tests/unit/google_activity/test_schema.py
  • egograph/pipelines/tests/unit/google_activity/test_storage.py
  • egograph/pipelines/tests/unit/google_activity/test_youtube_api.py
  • egograph/pipelines/tests/unit/spotify/test_collector.py
  • egograph/pipelines/tests/unit/test_api.py
  • egograph/pipelines/tests/unit/test_api_dependencies.py
  • egograph/pipelines/tests/unit/test_api_endpoints.py
  • egograph/pipelines/tests/unit/test_bootstrap_compact.py
  • egograph/pipelines/tests/unit/test_compaction.py
  • egograph/pipelines/tests/unit/test_provider_entrypoints.py
💤 Files with no reviewable changes (11)
  • egograph/backend/tests/integration/test_mcp_endpoint.py
  • egograph/pipelines/tests/unit/google_activity/test_schema.py
  • egograph/pipelines/tests/unit/test_api_endpoints.py
  • egograph/pipelines/tests/unit/google_activity/test_config.py
  • egograph/pipelines/tests/unit/google_activity/test_main.py
  • egograph/pipelines/tests/unit/test_compaction.py
  • egograph/pipelines/tests/unit/google_activity/test_youtube_api.py
  • egograph/pipelines/tests/unit/google_activity/test_pipeline.py
  • egograph/pipelines/tests/unit/google_activity/test_collector.py
  • egograph/pipelines/tests/unit/google_activity/test_storage.py
  • egograph/pipelines/tests/unit/spotify/test_collector.py
✅ Files skipped from review due to trivial changes (10)
  • egograph/pipelines/tests/integration/github/test_compact.py
  • egograph/pipelines/tests/unit/github/test_collector.py
  • egograph/pipelines/tests/unit/test_bootstrap_compact.py
  • egograph/pipelines/tests/unit/test_api.py
  • egograph/pipelines/tests/integration/github/test_enrichment.py
  • egograph/pipelines/tests/integration/github/test_ingest.py
  • egograph/pipelines/tests/unit/github/test_pipeline.py
  • egograph/pipelines/tests/unit/test_provider_entrypoints.py
  • egograph/pipelines/tests/integration/github/test_incremental.py
  • egograph/pipelines/tests/unit/test_api_dependencies.py
🚧 Files skipped from review as they are similar to previous changes (3)
  • docs/20.egograph/backend/architecture.md
  • egograph/pipelines/tests/unit/browser_history/test_youtube_metadata.py
  • egograph/pipelines/sources/browser_history/youtube_extraction.py

Comment on lines 349 to 360
query = """
SELECT
w.channel_id,
w.channel_name,
COUNT(*) as video_count,
SUM(COALESCE(v.duration_seconds, 0)) as total_seconds
MAX(w.channel_name) as channel_name,
COUNT(*) as watch_event_count,
COUNT(DISTINCT w.video_id) as unique_video_count
FROM read_parquet(?) w
LEFT JOIN read_parquet(?) v ON w.video_id = v.video_id
WHERE w.watched_at_utc::DATE BETWEEN ? AND ?
GROUP BY w.channel_id, w.channel_name
ORDER BY total_seconds DESC
GROUP BY w.channel_id
ORDER BY watch_event_count DESC
LIMIT ?
"""
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Check whether watch_events extraction / metadata persistence guarantees non-null channel_id.
fd -e py youtube_extraction
fd -e py youtube_metadata
rg -nP -C3 '\bchannel_id\b' egograph/pipelines/sources/browser_history/

Repository: endo-ly/ego-graph

Length of output: 4787


🏁 Script executed:

#!/bin/bash
# ingest_pipeline.py を探して内容を確認
fd -type f -name "*ingest_pipeline*" | head -5

Repository: endo-ly/ego-graph

Length of output: 231


🏁 Script executed:

#!/bin/bash
# youtube_queries.py の関連クエリを確認
rg -n "def get_watching_stats|def get_top_videos|def get_top_channels" egograph/backend/infrastructure/database/youtube_queries.py -A 20

Repository: endo-ly/ego-graph

Length of output: 1861


🏁 Script executed:

#!/bin/bash
# youtube_metadata.py の enrichment ロジックを詳しく確認
cat -n egograph/pipelines/sources/browser_history/youtube_metadata.py

Repository: endo-ly/ego-graph

Length of output: 6935


🏁 Script executed:

#!/bin/bash
# Find ingest_pipeline.py
find . -type f -name "*ingest_pipeline*"

Repository: endo-ly/ego-graph

Length of output: 235


🏁 Script executed:

#!/bin/bash
# Extract the full get_watching_stats query
sed -n '199,269p' egograph/backend/infrastructure/database/youtube_queries.py

Repository: endo-ly/ego-graph

Length of output: 1927


🏁 Script executed:

#!/bin/bash
# Extract the full get_top_videos query
sed -n '270,323p' egograph/backend/infrastructure/database/youtube_queries.py

Repository: endo-ly/ego-graph

Length of output: 1432


🏁 Script executed:

#!/bin/bash
# Check the ingest_pipeline.py for YouTube extraction to see the fallback handling
cat -n egograph/pipelines/sources/browser_history/ingest_pipeline.py

Repository: endo-ly/ego-graph

Length of output: 6211


NULL channel_id イベントが "NULL チャンネル" として集約される問題が複数クエリで存在

youtube_api_client=None で実行したり、メタデータ解決に失敗した場合、watch_events は channel_id が NULL のまま parquet に保存される(ingest_pipeline.py の 97-110 行目)。これらのクエリは NULL を除外していないため、複数の問題が発生する:

  • get_top_channels: NULL channels が1つの巨大なチャンネルとして上位に現れる
  • get_watching_stats: COUNT(DISTINCT w.channel_id) が NULL を個別のチャンネルとしてカウント
  • get_top_videos: MAX(w.channel_id) が NULL と混在し、予測不可能な結果

全クエリに WHERE w.channel_id IS NOT NULL を追加する必要があるぴょん:

diff 案
     query = """
         SELECT
             w.channel_id,
             MAX(w.channel_name) as channel_name,
             COUNT(*) as watch_event_count,
             COUNT(DISTINCT w.video_id) as unique_video_count
         FROM read_parquet(?) w
         WHERE w.watched_at_utc::DATE BETWEEN ? AND ?
+          AND w.channel_id IS NOT NULL
         GROUP BY w.channel_id
         ORDER BY watch_event_count DESC
         LIMIT ?
     """
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@egograph/backend/infrastructure/database/youtube_queries.py` around lines 349
- 360, Several aggregation queries in youtube_queries.py (including
get_top_channels, get_watching_stats, get_top_videos) currently include rows
with NULL channel_id, causing NULL to be aggregated as a fake channel; update
each SQL query (e.g., the query string variable used in get_top_channels shown
above) to filter out NULLs by adding "WHERE w.channel_id IS NOT NULL" (or add
"AND w.channel_id IS NOT NULL" if a WHERE clause already exists) so that
COUNT(DISTINCT w.channel_id), MAX(w.channel_id) and other aggregations exclude
NULL channel_id rows and return correct results.

Comment thread egograph/pipelines/sources/browser_history/ingest_pipeline.py Outdated
Comment on lines +112 to +126
# watch events を月次 parquet で保存
youtube_monthly = group_watch_events_by_month(youtube_events)
for (year, month), month_events in youtube_monthly.items():
saved_key = storage.save_parquet(
month_events,
year=year,
month=month,
prefix="youtube/watch_events",
)
if not saved_key:
logger.warning(
"Failed to save YouTube watch events for %d-%02d",
year,
month,
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

watch_events の月次保存が部分失敗しても state は events_saved のままなのが気になるぴょん

ここだけ挙動が片肺で、browser_history/page_views 側(Line 79-89)は保存失敗で RuntimeError を投げるのに、watch_events は warning ログだけで先に進んじゃう。そのまま Line 131 で last_sync_status="events_saved" として state が確定するから、派生データが欠けたまま「成功」と記録される silent failure 経路になってるぴょん。

YouTube は派生なのでブラウザ本体の ingest は止めたくない、という設計意図は理解できるけど、せめて以下のどれかで可観測性は確保したいね:

  • BrowserHistoryPipelineResultyoutube_events_saved: bool(or 失敗月のタプル)を追加して呼び出し側に伝える
  • 失敗があった場合は last_sync_statusevents_saved_partial 等に分ける
  • warning ではなく error ログ + メトリクス(例: logger.error + カウンタ)に昇格
🐰 リターン値で失敗月を拾えるようにする案
     youtube_monthly = group_watch_events_by_month(youtube_events)
+    youtube_failed_partitions: list[tuple[int, int]] = []
     for (year, month), month_events in youtube_monthly.items():
         saved_key = storage.save_parquet(
             month_events,
             year=year,
             month=month,
             prefix="youtube/watch_events",
         )
         if not saved_key:
-            logger.warning(
-                "Failed to save YouTube watch events for %d-%02d",
-                year,
-                month,
-            )
+            logger.error(
+                "Failed to save YouTube watch events for %d-%02d",
+                year,
+                month,
+            )
+            youtube_failed_partitions.append((year, month))

BrowserHistoryPipelineResult 側にもフィールドを追加すると呼び出し側 (API handler) で判定しやすいぴょん。

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@egograph/pipelines/sources/browser_history/ingest_pipeline.py` around lines
112 - 126, The loop that saves monthly YouTube watch events (youtube_monthly
from group_watch_events_by_month) currently only logs warnings on individual
save_parquet failures, causing last_sync_status="events_saved" to incorrectly
mark success; change this by collecting failed months (e.g.,
failed_youtube_months list) while iterating over youtube_monthly and on any
failure call logger.error and increment a metric/counter, then return that
failure info in the pipeline result (extend BrowserHistoryPipelineResult with
youtube_events_saved: bool or failed_youtube_months: List[Tuple[int,int]]), and
set last_sync_status to a distinct value like "events_saved_partial" when
failures exist so callers can observe partial failures rather than silently
succeeding.

- fix: exclude NULL channel_id from get_top_channels and unique_channel_count
- fix: replace full-width parenthesis with ASCII (RUF003)
- fix: elevate watch events save failure from warning to error log
@endo-ly
Copy link
Copy Markdown
Owner Author

endo-ly commented Apr 22, 2026

レビュー対応完了(2巡目)

対応したレビューコメント: 3件 / 既に対応済み: 1件

対象ファイル 修正内容 ステータス
egograph/backend/infrastructure/database/youtube_queries.py get_top_channelsAND w.channel_id IS NOT NULL を追加し、NULL チャンネルがランキングに現れるのを防止。get_watching_statsunique_channel_count も NULL 除外 ✅ 修正済み
egograph/pipelines/sources/browser_history/ingest_pipeline.py コメント内の全角括弧を半角に修正 (RUF003) ✅ 修正済み
egograph/pipelines/sources/browser_history/ingest_pipeline.py watch events 月次保存失敗時のログレベルを warningerror に昇格 ✅ 修正済み
egograph/pipelines/sources/browser_history/ingest_pipeline.py:106 save_youtube_masters 戻り値チェックは前回コミット (68167da) で対応済み ℹ️ 前回対応済み

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
egograph/backend/infrastructure/database/youtube_queries.py (1)

182-195: ⚠️ Potential issue | 🟠 Major

limit を SQL に直埋めしないでぴょん

limit は型ヒントだけだと実行時に守られないから、直接呼び出し経由で SQL 文字列を壊せる。下の get_top_videos / get_top_channels と同じく LIMIT ? に寄せよう。

🔒 修正案
-    if limit is not None:
-        query += f"\n        LIMIT {limit}"
+    query_params: list[Any] = [partition_paths, params.start_date, params.end_date]
+    if limit is not None:
+        if limit < 1:
+            raise ValueError("limit must be a positive integer")
+        query += "\n        LIMIT ?"
+        query_params.append(limit)
@@
     return execute_query(
         params.conn,
         query,
-        [partition_paths, params.start_date, params.end_date],
+        query_params,
     )

修正後にこの確認で、LIMIT {limit} 系の直埋めが残ってないことを見られる。

#!/bin/bash
# Description: get_watch_events 周辺に LIMIT の f-string 直埋めが残っていないか確認する。
# Expect: no matches for interpolated LIMIT patterns.
rg -nP -C3 'LIMIT\s*\{limit\}|query\s*\+=\s*f["'\''].*LIMIT' egograph/backend/infrastructure/database/youtube_queries.py
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@egograph/backend/infrastructure/database/youtube_queries.py` around lines 182
- 195, The get_watch_events code currently injects limit directly into the SQL
string (the f-string "LIMIT {limit}"), which is unsafe; change the query to use
a parameter placeholder (e.g., "LIMIT ?") and append the limit value to the
parameters list passed to execute_query so execute_query(params.conn, query,
[partition_paths, params.start_date, params.end_date, limit]) is used; follow
the same pattern as get_top_videos/get_top_channels and update the
query-building in get_watch_events (and any related variables like
partition_paths/params) to avoid direct f-string interpolation of limit.
♻️ Duplicate comments (1)
egograph/pipelines/sources/browser_history/ingest_pipeline.py (1)

101-131: ⚠️ Potential issue | 🟠 Major

YouTube 派生保存の失敗を state/result に載せよう

logger.error にはなったけど、Line 131 と Line 143-149 ではまだ成功扱いのまま。youtube/watch_events の一部月や master 保存が落ちても呼び出し側が検知できないから、失敗 partition / master 保存成否を BrowserHistoryPipelineResultlast_sync_status に反映しよう。前回指摘の残りだぴょん。

🐰 修正イメージ
 `@dataclass`(frozen=True)
 class BrowserHistoryPipelineResult:
@@
     events_saved: bool
     received_at: datetime
     compaction_targets: tuple[tuple[int, int], ...] = ()
+    youtube_masters_saved: bool | None = None
+    youtube_failed_partitions: tuple[tuple[int, int], ...] = ()
@@
     # YouTube watch event 抽出・メタデータ解決・保存
+    youtube_masters_saved: bool | None = None
+    youtube_failed_partitions: list[tuple[int, int]] = []
     youtube_events = extract_youtube_watch_events(rows)
@@
                 youtube_events, video_master, channel_master = resolved
-                if not save_youtube_masters(storage, video_master, channel_master):
-                    logger.warning(
+                youtube_masters_saved = save_youtube_masters(
+                    storage,
+                    video_master,
+                    channel_master,
+                )
+                if not youtube_masters_saved:
+                    logger.error(
                         "Failed to save YouTube master parquet; "
                         "watch events will still be saved",
                     )
@@
             if not saved_key:
                 logger.error(
                     "Failed to save YouTube watch events for %d-%02d",
                     year,
                     month,
                 )
+                youtube_failed_partitions.append((year, month))
@@
-        last_sync_status="events_saved",
+        last_sync_status=(
+            "events_saved_partial"
+            if youtube_masters_saved is False or youtube_failed_partitions
+            else "events_saved"
+        ),
@@
         events_saved=events_saved,
         received_at=normalized_received_at,
         compaction_targets=compaction_targets,
+        youtube_masters_saved=youtube_masters_saved,
+        youtube_failed_partitions=tuple(youtube_failed_partitions),
     )

この status 値を追加できるか確認してから入れよう:

#!/bin/bash
# Description: BrowserHistoryIngestState と last_sync_status の許容値/利用箇所を確認する。
# Expect: last_sync_status が enum/Literal なら "events_saved_partial" 相当の値も schema/consumer に追加する。

rg -n -C4 '\bBrowserHistoryIngestState\b|\blast_sync_status\b|\bevents_saved_partial\b'

Also applies to: 143-149

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@egograph/pipelines/sources/browser_history/ingest_pipeline.py` around lines
101 - 131, The code currently logs YouTube master or monthly parquet save
failures but still writes a success state; update the logic so
BrowserHistoryIngestState (and the returned BrowserHistoryPipelineResult)
reflects partial failure when any save fails: track a boolean (e.g.,
had_save_failures) during the save_youtube_masters call and inside the
youtube_monthly loop (references: save_youtube_masters, storage.save_parquet,
youtube_monthly), and if any save returns falsy set last_sync_status to a new
value like "events_saved_partial" (or a configured constant) instead of
"events_saved" and include a field on BrowserHistoryPipelineResult indicating
which parts failed (e.g., masters_failed and failed_months) so callers can
detect partial failures; also ensure the new status/value is added where
BrowserHistoryIngestState and BrowserHistoryPipelineResult are
constructed/returned.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@egograph/backend/infrastructure/database/youtube_queries.py`:
- Line 44: Docstrings in youtube_queries.py use fullwidth Japanese parentheses
which trigger Ruff RUF002; locate occurrences such as the line containing
"S3パスパターン(例: s3://egograph/events/youtube/watch_events/**/*.parquet)" and the
other docstring blocks indicated (around lines 145-149, 278-282, 336) and
replace the fullwidth parentheses 「(」「)」 with ASCII "(" and ")" so the text
becomes "S3パスパターン (例: s3://egograph/events/youtube/watch_events/**/*.parquet)"
(and similarly for the other docstrings); keep all other text unchanged.

---

Outside diff comments:
In `@egograph/backend/infrastructure/database/youtube_queries.py`:
- Around line 182-195: The get_watch_events code currently injects limit
directly into the SQL string (the f-string "LIMIT {limit}"), which is unsafe;
change the query to use a parameter placeholder (e.g., "LIMIT ?") and append the
limit value to the parameters list passed to execute_query so
execute_query(params.conn, query, [partition_paths, params.start_date,
params.end_date, limit]) is used; follow the same pattern as
get_top_videos/get_top_channels and update the query-building in
get_watch_events (and any related variables like partition_paths/params) to
avoid direct f-string interpolation of limit.

---

Duplicate comments:
In `@egograph/pipelines/sources/browser_history/ingest_pipeline.py`:
- Around line 101-131: The code currently logs YouTube master or monthly parquet
save failures but still writes a success state; update the logic so
BrowserHistoryIngestState (and the returned BrowserHistoryPipelineResult)
reflects partial failure when any save fails: track a boolean (e.g.,
had_save_failures) during the save_youtube_masters call and inside the
youtube_monthly loop (references: save_youtube_masters, storage.save_parquet,
youtube_monthly), and if any save returns falsy set last_sync_status to a new
value like "events_saved_partial" (or a configured constant) instead of
"events_saved" and include a field on BrowserHistoryPipelineResult indicating
which parts failed (e.g., masters_failed and failed_months) so callers can
detect partial failures; also ensure the new status/value is added where
BrowserHistoryIngestState and BrowserHistoryPipelineResult are
constructed/returned.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: dd5ded90-e623-4622-ac0e-351215cfacc5

📥 Commits

Reviewing files that changed from the base of the PR and between 68167da and 3d35e46.

📒 Files selected for processing (2)
  • egograph/backend/infrastructure/database/youtube_queries.py
  • egograph/pipelines/sources/browser_history/ingest_pipeline.py

Comment thread egograph/backend/infrastructure/database/youtube_queries.py Outdated
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 4

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
egograph/pipelines/tests/integration/google_activity/test_pipeline.py (1)

220-230: ⚠️ Potential issue | 🟡 Minor

mock_api_client の再代入が3連発してるよ🐰

test_pipeline_with_multiple_months の中で、同じ mock_api_client = MagicMock() + get_videos / get_channels モック設定が3回くり返し書かれてるぴょん。たぶんコピペ事故。最後の1つ以外は冗長で、実質動作は変わらないけど読み手が混乱するから消しちゃおう〜。

🐇 修正案
     mock_api_client = MagicMock()
     mock_api_client.get_videos.return_value = []
     mock_api_client.get_channels.return_value = []
 
-    mock_api_client = MagicMock()
-    mock_api_client.get_videos.return_value = []
-    mock_api_client.get_channels.return_value = []
-
-    mock_api_client = MagicMock()
-    mock_api_client.get_videos.return_value = []
-    mock_api_client.get_channels.return_value = []
-
     # Act
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@egograph/pipelines/tests/integration/google_activity/test_pipeline.py` around
lines 220 - 230, In test_pipeline_with_multiple_months remove the duplicate
reassignments of mock_api_client and redundant mock setups: keep a single
mock_api_client = MagicMock() and set mock_api_client.get_videos.return_value =
[] and mock_api_client.get_channels.return_value = [] once (remove the two extra
repeated blocks) so the test uses one clearly-initialized mock; verify any later
uses reference that single mock_api_client.
🧹 Nitpick comments (7)
docs/20.egograph/pipelines/youtube.md (1)

273-275: セクション番号ちょっと飛んでるかも

section 7 の次がいきなり section 11 になってるぴょん。テンプレ都合の欠番ならそのままでOKだけど、気になるなら 8〜10 を埋めるか番号を詰めると読み手に優しいよ〜。

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/20.egograph/pipelines/youtube.md` around lines 273 - 275, The section
numbering jumps from section 7 to "## 11. 実装時の考慮事項"; update the header to
restore sequential numbering (e.g., change "## 11. 実装時の考慮事項" to "## 8.
実装時の考慮事項") or insert placeholder sections 8–10 if those are intentionally
reserved, ensuring the header text "## 11. 実装時の考慮事項" is adjusted so the
document's section sequence is contiguous and reader-friendly.
egograph/pipelines/sources/youtube/api_client.py (1)

60-69: バックオフに jitter 入れたい

RETRY_BACKOFF_FACTOR**attempt の固定待機だと、複数 sync が同時に走った時に同期してスパイクしがちぴょん。time.sleep(wait_time + random.uniform(0, wait_time * 0.1)) みたいに jitter 足しとくとAPI側にも優しいよ〜。

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@egograph/pipelines/sources/youtube/api_client.py` around lines 60 - 69, The
fixed backoff should add randomized jitter to avoid synchronized spikes: in the
retry block where wait_time = RETRY_BACKOFF_FACTOR**attempt is computed (around
the logger.warning and time.sleep), compute a jittered_sleep = wait_time +
random.uniform(0, wait_time * 0.1) and call time.sleep(jittered_sleep) (and
update the logged wait time in the logger.warning to show the actual
jittered_sleep); also ensure random is imported at the top of the module. This
change affects the retry logic using attempt, MAX_RETRIES, RETRY_BACKOFF_FACTOR,
logger.warning, and time.sleep.
egograph/pipelines/tests/unit/youtube/test_pipeline.py (1)

41-108: metadata 解決まで検証すると、この happy path がもっと強くなるよ 🐇

今の assertion だと、API metadata 解決や master 保存が丸ごと抜けても watch_event_count == 1save_watch_events だけで通り得る。fake_client と master save を用意しているなら、enrich 済み行と master 保存も見ると安心。

🧪 追加 assertion 案
     assert result["status"] == "succeeded"
     assert result["watch_event_count"] == 1
     fake_storage.save_watch_events.assert_called_once()
+    saved_rows = fake_storage.save_watch_events.call_args.args[0]
+    assert saved_rows[0]["video_title"] == "API Video 1"
+    assert saved_rows[0]["channel_id"] == "channel-1"
+    assert saved_rows[0]["channel_name"] == "Channel 1"
+    fake_client.get_videos.assert_called_once()
+    fake_client.get_channels.assert_called_once()
+    fake_storage.save_video_master.assert_called_once()
+    fake_storage.save_channel_master.assert_called_once()
     fake_storage.mark_sync_processed.assert_called_once()
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@egograph/pipelines/tests/unit/youtube/test_pipeline.py` around lines 41 -
108, Update the test_run_youtube_ingest_processes_browser_history_sync test to
assert that metadata enrichment and master saves actually happened: check that
the fake_client methods (get_videos and get_channels) were called, and assert
fake_storage.save_video_master and fake_storage.save_channel_master were each
called once with expected results (or at least non-empty/parquet key return
values); also verify the saved watch events contain enriched fields (e.g. video
title/channel id) by inspecting the argument passed to
fake_storage.save_watch_events; keep using the existing run_youtube_ingest/_run
invocation and MagicMock objects to locate these calls.
egograph/pipelines/sources/youtube/metadata.py (1)

73-89: API 失敗が完全にサイレントになってるの、ちょっと危ないぴょん

except Exception: で全部掴んで None を返すと、呼び出し側(pipeline.py の L111-114)は enrichment をスキップして素通り、master も保存されず、watch events は channel_id が NULL のまま parquet に焼かれちゃうよ。quota 枯渇・認証切れ・ネットワーク障害が起きても誰も気付けない状態になる。最低でもログは残そうぴょん。BLE001 の静的解析警告もここ由来ね。

🛠️ 修正案
+import logging
+
+logger = logging.getLogger(__name__)
+
 ...
     try:
         api_videos = api_client.get_videos(video_ids)
         video_master = build_video_master_rows(api_videos, content_type_map)
         channel_ids = sorted(
             {
                 row["channel_id"]
                 for row in video_master
                 if isinstance(row.get("channel_id"), str)
             }
         )
         api_channels = api_client.get_channels(channel_ids) if channel_ids else []
-    except Exception:
+    except Exception:
+        logger.exception(
+            "Failed to resolve YouTube metadata; continuing without enrichment "
+            "(video_ids=%d)",
+            len(video_ids),
+        )
         return None

さらに安全側に倒すなら、API クライアントが投げる具体的な例外型(HTTP系・quota系)に絞って catch するとより良いぴょん。

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@egograph/pipelines/sources/youtube/metadata.py` around lines 73 - 89, The
try/except around api_client.get_videos / api_client.get_channels is swallowing
all errors and returning None; change the bare "except Exception:" to catch/log
the exception (e.g., "except Exception as e: logger.error('YouTube metadata
fetch failed', exc_info=e)" or use logger.exception) so the failure is recorded,
and then either re-raise or return a safe fallback instead of plain None; narrow
the catch to specific API/network/auth exceptions where possible (wrap the calls
to api_client.get_videos, api_client.get_channels inside this improved error
handling in the same block that currently returns None).
egograph/pipelines/sources/youtube/storage.py (1)

100-113: Parquet を丸ごとメモリに載せるの、運用で気をつけてぴょん

paginator で Contents を回した後、各オブジェクトを response["Body"].read()BytesIOpd.read_parquet でフルロードしてるよ。Browser History の月次パーティションが大きくなってくると、1ファイルあたりのメモリフットプリントがそのまま効いてくるし、rows.extend でさらに全部 dict 化して積み上げるから、同期ジョブ実行環境のメモリを圧迫しやすい構造だね。

いまは動くけど、パーティションあたりの件数が増えてきたら pyarrow の filters= で sync_id 事前フィルタするか、ストリーミング読み込みに切り替えると安心ぴょん。運用観点での備忘としてだけ。

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@egograph/pipelines/sources/youtube/storage.py` around lines 100 - 113, The
current loop in the paginator uses self.s3.get_object -> response["Body"].read()
-> BytesIO -> pd.read_parquet and then rows.extend, which loads each parquet
file fully into memory; change this to filter or stream at read-time to avoid
full-file materialization: when iterating keys in the paginator (the code
referencing paginator, self.s3.get_object, pd.read_parquet, rows.extend and
sync_id) replace the full-read approach with a parquet reader that applies a
pushdown filter on sync_id (e.g. pyarrow dataset or pd.read_parquet with
engine='pyarrow' and filters=[('sync_id','==', sync_id)]) or use pyarrow's
streaming/row-group iteration to yield only matching rows before converting to
dicts, so you never load whole file contents into memory and still append only
matching records to rows.
egograph/backend/tests/unit/repositories/test_youtube_repository.py (1)

39-59: _patch_youtube_paths 相当のヘルパーを共有するとスッキリするぴょん

全く同じ3連パッチが test_get_watch_events / test_get_watching_stats / test_get_top_videos / test_get_top_channels で4回コピペされてるよ。test_youtube_queries.py にはもう _patch_youtube_paths(youtube_with_sample_data) ヘルパーがあるから、テスト用の共有モジュール(conftest か helpers)に切り出して両方から使い回すと DRY になるぴょん。Chill モードなので急ぎじゃないけど、メンテで4箇所同時に直す羽目になりがちだから覚えておいて。

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@egograph/backend/tests/unit/repositories/test_youtube_repository.py` around
lines 39 - 59, Extract the repeated ExitStack + three patch calls for
_generate_partition_paths, get_videos_parquet_path, and
get_channels_parquet_path into a shared test helper named
_patch_youtube_paths(youtube_with_sample_data) (place it in conftest.py or a
tests/helpers module), have it return a context manager that applies the same
three patches using the youtube_with_sample_data paths, then replace the
duplicated blocks in test_get_watch_events, test_get_watching_stats,
test_get_top_videos, and test_get_top_channels (and the other test file that
already has _patch_youtube_paths) to call this helper instead of copying the
three patch calls inline.
egograph/pipelines/sources/youtube/extraction.py (1)

42-48: youtu.be で余計なパスが付いた URL の取り扱いがちょい怪しい

https://youtu.be/ABCDEF/extra みたいな非標準 URL が来ると、path.lstrip("/")"ABCDEF/extra" をそのまま返しちゃって video_id にスラッシュが混ざるよ。normalize_youtube_url の方でも同じで、https://www.youtube.com/watch?v=ABCDEF/extra を組み立てちゃう。極端なケースだから致命じゃないけど、先頭セグメントだけ取るほうが安全ぴょん。

🛠️ 修正案
     if host in ("youtu.be", "www.youtu.be"):
-        video_id = path.lstrip("/")
+        video_id = path.lstrip("/").split("/", 1)[0]
         return video_id or None

normalize_youtube_url 側も同様に先頭セグメント取得で揃えるといいよ。

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@egograph/pipelines/sources/youtube/extraction.py` around lines 42 - 48, In
extraction.py the youtu.be branch uses video_id = path.lstrip("/") which returns
"ABCDEF/extra" for URLs like https://youtu.be/ABCDEF/extra; change this to take
only the first path segment (e.g., video_id = path.lstrip("/").split("/", 1)[0]
or equivalent) so only the actual ID is returned, and apply the same
single-segment logic inside normalize_youtube_url when constructing/watch?v=
values so both functions consistently use the leading segment as the video ID.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@egograph/backend/infrastructure/database/youtube_queries.py`:
- Around line 156-180: get_watch_events currently omits LIMIT when limit is None
which can return unbounded rows; change the SQL to always include a bounded
LIMIT using COALESCE for the bound (e.g. "LIMIT COALESCE(?, 100000)") and always
append the limit parameter to query_params (allowing NULL to fall back to the
COALESCE default) so the execute_query call from get_watch_events will never run
an unbounded query; update the query string construction in get_watch_events
(and consider extracting the numeric upper bound to a MAX_LIMIT constant) and
ensure query_params is appended to before calling execute_query.

In `@egograph/pipelines/api/browser_history.py`:
- Around line 31-49: When enqueue_browser_history_compact succeeds but
enqueue_youtube_ingest can raise, wrap the call to enqueue_youtube_ingest (where
youtube_run is set) in a try/except so that a failure does not turn the whole
handler into a 500; always return the compact run info (run.run_id) and
202-style accepted response, and on exception set youtube_run to None, add an
error field to the response (e.g., youtube_error) and log the exception via the
same logger. Specifically, keep the call to enqueue_browser_history_compact and
its produced run untouched, change the enqueue_youtube_ingest invocation into a
protected block that catches exceptions, assigns youtube_run = None on error,
logs the exception, and ensures the returned dict still includes "run_id":
run.run_id, "youtube_run_id": youtube_run.run_id if youtube_run else None and a
youtube_error message when an exception occurred.

In `@egograph/pipelines/sources/youtube/api_client.py`:
- Around line 37-74: In the YouTube API request retry block, don't treat all 403
responses as retryable: inside the 403 handling in api_client.py (the code that
inspects response.json() and raises QuotaExceededError for errors with reason
"quotaExceeded"), detect non-quota 403s and immediately raise the HTTP error
instead of falling through to the outer retry logic; specifically, after parsing
the 403 JSON, if you do not find an error with reason "quotaExceeded" call
response.raise_for_status() (or raise requests.HTTPError) so non-quota 4xx/403
responses are not retried by the outer loop that uses MAX_RETRIES and logger.

In `@egograph/pipelines/sources/youtube/pipeline.py`:
- Around line 59-77: In _parse_request, strengthen validation so malformed
payloads are skipped: ensure sync_id is a non-empty string (e.g., check
sync_id.strip() not empty) and validate each month/year before adding to
target_months (month must be an int in 1..12 and year must be a positive
reasonable int); if any entry fails validation, skip that entry and if no valid
months remain return None so YouTubeIngestRequest is never created with empty
sync_id or invalid months. Use the existing raw_months, target_months, and
YouTubeIngestRequest symbols to locate and update the checks.

---

Outside diff comments:
In `@egograph/pipelines/tests/integration/google_activity/test_pipeline.py`:
- Around line 220-230: In test_pipeline_with_multiple_months remove the
duplicate reassignments of mock_api_client and redundant mock setups: keep a
single mock_api_client = MagicMock() and set
mock_api_client.get_videos.return_value = [] and
mock_api_client.get_channels.return_value = [] once (remove the two extra
repeated blocks) so the test uses one clearly-initialized mock; verify any later
uses reference that single mock_api_client.

---

Nitpick comments:
In `@docs/20.egograph/pipelines/youtube.md`:
- Around line 273-275: The section numbering jumps from section 7 to "## 11.
実装時の考慮事項"; update the header to restore sequential numbering (e.g., change "##
11. 実装時の考慮事項" to "## 8. 実装時の考慮事項") or insert placeholder sections 8–10 if those
are intentionally reserved, ensuring the header text "## 11. 実装時の考慮事項" is
adjusted so the document's section sequence is contiguous and reader-friendly.

In `@egograph/backend/tests/unit/repositories/test_youtube_repository.py`:
- Around line 39-59: Extract the repeated ExitStack + three patch calls for
_generate_partition_paths, get_videos_parquet_path, and
get_channels_parquet_path into a shared test helper named
_patch_youtube_paths(youtube_with_sample_data) (place it in conftest.py or a
tests/helpers module), have it return a context manager that applies the same
three patches using the youtube_with_sample_data paths, then replace the
duplicated blocks in test_get_watch_events, test_get_watching_stats,
test_get_top_videos, and test_get_top_channels (and the other test file that
already has _patch_youtube_paths) to call this helper instead of copying the
three patch calls inline.

In `@egograph/pipelines/sources/youtube/api_client.py`:
- Around line 60-69: The fixed backoff should add randomized jitter to avoid
synchronized spikes: in the retry block where wait_time =
RETRY_BACKOFF_FACTOR**attempt is computed (around the logger.warning and
time.sleep), compute a jittered_sleep = wait_time + random.uniform(0, wait_time
* 0.1) and call time.sleep(jittered_sleep) (and update the logged wait time in
the logger.warning to show the actual jittered_sleep); also ensure random is
imported at the top of the module. This change affects the retry logic using
attempt, MAX_RETRIES, RETRY_BACKOFF_FACTOR, logger.warning, and time.sleep.

In `@egograph/pipelines/sources/youtube/extraction.py`:
- Around line 42-48: In extraction.py the youtu.be branch uses video_id =
path.lstrip("/") which returns "ABCDEF/extra" for URLs like
https://youtu.be/ABCDEF/extra; change this to take only the first path segment
(e.g., video_id = path.lstrip("/").split("/", 1)[0] or equivalent) so only the
actual ID is returned, and apply the same single-segment logic inside
normalize_youtube_url when constructing/watch?v= values so both functions
consistently use the leading segment as the video ID.

In `@egograph/pipelines/sources/youtube/metadata.py`:
- Around line 73-89: The try/except around api_client.get_videos /
api_client.get_channels is swallowing all errors and returning None; change the
bare "except Exception:" to catch/log the exception (e.g., "except Exception as
e: logger.error('YouTube metadata fetch failed', exc_info=e)" or use
logger.exception) so the failure is recorded, and then either re-raise or return
a safe fallback instead of plain None; narrow the catch to specific
API/network/auth exceptions where possible (wrap the calls to
api_client.get_videos, api_client.get_channels inside this improved error
handling in the same block that currently returns None).

In `@egograph/pipelines/sources/youtube/storage.py`:
- Around line 100-113: The current loop in the paginator uses self.s3.get_object
-> response["Body"].read() -> BytesIO -> pd.read_parquet and then rows.extend,
which loads each parquet file fully into memory; change this to filter or stream
at read-time to avoid full-file materialization: when iterating keys in the
paginator (the code referencing paginator, self.s3.get_object, pd.read_parquet,
rows.extend and sync_id) replace the full-read approach with a parquet reader
that applies a pushdown filter on sync_id (e.g. pyarrow dataset or
pd.read_parquet with engine='pyarrow' and filters=[('sync_id','==', sync_id)])
or use pyarrow's streaming/row-group iteration to yield only matching rows
before converting to dicts, so you never load whole file contents into memory
and still append only matching records to rows.

In `@egograph/pipelines/tests/unit/youtube/test_pipeline.py`:
- Around line 41-108: Update the
test_run_youtube_ingest_processes_browser_history_sync test to assert that
metadata enrichment and master saves actually happened: check that the
fake_client methods (get_videos and get_channels) were called, and assert
fake_storage.save_video_master and fake_storage.save_channel_master were each
called once with expected results (or at least non-empty/parquet key return
values); also verify the saved watch events contain enriched fields (e.g. video
title/channel id) by inspecting the argument passed to
fake_storage.save_watch_events; keep using the existing run_youtube_ingest/_run
invocation and MagicMock objects to locate these calls.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: c1d12050-8a9d-4914-b81c-6057f7e818a2

📥 Commits

Reviewing files that changed from the base of the PR and between 3d35e46 and 6b34b9c.

📒 Files selected for processing (30)
  • docs/20.egograph/pipelines/youtube.md
  • egograph/backend/infrastructure/database/youtube_queries.py
  • egograph/backend/tests/conftest.py
  • egograph/backend/tests/unit/repositories/test_youtube_queries.py
  • egograph/backend/tests/unit/repositories/test_youtube_repository.py
  • egograph/pipelines/api/browser_history.py
  • egograph/pipelines/service.py
  • egograph/pipelines/sources/browser_history/__init__.py
  • egograph/pipelines/sources/browser_history/pipeline.py
  • egograph/pipelines/sources/browser_history/transform.py
  • egograph/pipelines/sources/google_activity/pipeline.py
  • egograph/pipelines/sources/google_activity/transform.py
  • egograph/pipelines/sources/google_activity/youtube_api.py
  • egograph/pipelines/sources/youtube/__init__.py
  • egograph/pipelines/sources/youtube/api_client.py
  • egograph/pipelines/sources/youtube/canonical.py
  • egograph/pipelines/sources/youtube/config.py
  • egograph/pipelines/sources/youtube/extraction.py
  • egograph/pipelines/sources/youtube/metadata.py
  • egograph/pipelines/sources/youtube/pipeline.py
  • egograph/pipelines/sources/youtube/storage.py
  • egograph/pipelines/tests/integration/google_activity/test_pipeline.py
  • egograph/pipelines/tests/unit/browser_history/test_pipeline_entrypoint.py
  • egograph/pipelines/tests/unit/browser_history/test_youtube_extraction.py
  • egograph/pipelines/tests/unit/browser_history/test_youtube_metadata.py
  • egograph/pipelines/tests/unit/google_activity/test_youtube_api.py
  • egograph/pipelines/tests/unit/test_api.py
  • egograph/pipelines/tests/unit/test_provider_entrypoints.py
  • egograph/pipelines/tests/unit/youtube/test_pipeline.py
  • egograph/pipelines/workflows/registry.py
💤 Files with no reviewable changes (1)
  • egograph/pipelines/sources/google_activity/youtube_api.py
✅ Files skipped from review due to trivial changes (3)
  • egograph/pipelines/sources/google_activity/pipeline.py
  • egograph/pipelines/sources/youtube/config.py
  • egograph/pipelines/tests/unit/browser_history/test_pipeline_entrypoint.py
🚧 Files skipped from review as they are similar to previous changes (4)
  • egograph/pipelines/tests/unit/test_api.py
  • egograph/pipelines/tests/unit/browser_history/test_youtube_metadata.py
  • egograph/pipelines/tests/unit/browser_history/test_youtube_extraction.py
  • egograph/pipelines/tests/unit/google_activity/test_youtube_api.py

Comment thread egograph/backend/infrastructure/database/youtube_queries.py
Comment thread egograph/pipelines/api/browser_history.py
Comment thread egograph/pipelines/sources/youtube/api_client.py
Comment thread egograph/pipelines/sources/youtube/pipeline.py Outdated
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
docs/20.egograph/pipelines/youtube.md (1)

295-325: ⚠️ Potential issue | 🟡 Minor

サンプルにも sync_id 系フィールドを入れてね

Line 80 と Line 144 では必須/保存列として定義しているのに、サンプル JSON から抜けてるぴょん。この doc が source of truth なら、実装者が sync_id なしで投入していいと読めちゃう。

🐰 修正案
 {
+  "sync_id": "sync_123",
   "page_view_id": "pv_123",
   "started_at_utc": "2026-04-21T12:00:00Z",
   "url": "https://www.youtube.com/watch?v=dQw4w9WgXcQ",
@@
   "source": "browser_history",
   "source_event_id": "pv_123",
   "source_device": "desktop-main",
-  "ingested_at_utc": "2026-04-21T12:10:00Z"
+  "ingested_at_utc": "2026-04-21T12:10:00Z",
+  "browser_history_sync_id": "sync_123"
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/20.egograph/pipelines/youtube.md` around lines 295 - 325, サンプル JSON に必須の
sync_id 系フィールドが抜けているので、入力データ例「Browser History Page View」と Parquet 行例「Watch
Event」に schema で必須/保存として定義されている sync_id 系フィールド(例: sync_id や関連の sync_*
フィールド)を追加し、それぞれのフィールド名と想定される値フォーマットを入れてください(サンプル値を具体的に記載してスキーマ定義と一貫させる)。対象は見出し「12.1
入力データ例 (Browser History Page View)」内のページビュー JSON と「12.2 Parquet行例 (Watch
Event)」内の watch_event JSON、各オブジェクトに sync_id 系フィールドを追加してサンプル値を埋めてください。
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@egograph/backend/infrastructure/database/youtube_queries.py`:
- Around line 101-104: Replace the incorrect week-format "%Y-W%V" (calendar year
+ ISO week) with the ISO-year format "%G-W%V" wherever used (e.g., the week
formatting in youtube_queries.py, github_queries.py and queries.py for Spotify
stats) so year-crossing weeks map to the ISO year; additionally harden
_generate_partition_paths so it does not blindly return every month in
range—either filter the generated partition paths to include only those that
actually exist (use a filesystem/S3 existence check) or add a safe fallback that
allows DuckDB-style globs or skips missing months to avoid failures when month
partitions are not yet written.
- Around line 170-185: date_format_map currently maps "week" to "%Y-W%V", which
yields incorrect ISO week labels around year boundaries; update
date_format_map["week"] to "%G-W%V" so the strftime call in the query (the
strftime(..., '{date_format_map[granularity]}') inside the SQL built with
_latest_master_ctes()) uses the ISO year specifier, and add/adjust unit tests to
cover the "week" granularity to prevent regressions.

In `@egograph/pipelines/sources/youtube/api_client.py`:
- Around line 103-109: The debug log currently emits raw identifiers via
batch_ids which may reveal watched videos/channels; update the logger.debug call
in api_client.py (the place that uses logger.debug with endpoint, batch_ids and
YOUTUBE_API_BATCH_SIZE) to stop logging ",".join(batch_ids) and instead log only
the batch size or count (e.g., len(batch_ids)) along with endpoint and batch
index; ensure the unique symbols touched are the logger.debug invocation,
batch_ids variable, and YOUTUBE_API_BATCH_SIZE constant so you don't
accidentally leave the raw IDs in other log statements.

In `@egograph/pipelines/sources/youtube/metadata.py`:
- Around line 49-59: 当該箇所は video.get("channel_id") や channel が欠損すると元 event の
channel_id/channel_name を None で上書きしてしまうので、enriched.append に渡す値を設定する際に常に既存の
event フィールドへフォールバックするよう修正してください;具体的には channel_id は video.get("channel_id") または
event["channel_id"] を使い、channel_name は channel が存在するなら
channel.get("channel_name")、なければ video.get("channel_name") または
event["channel_name"] を順に使うようにして、None で既存データを上書きしないようにしてください(参照箇所:
channels_by_id, video, event, enriched.append)。

In `@egograph/pipelines/sources/youtube/storage.py`:
- Around line 162-188: save_video_master and save_channel_master perform a
read->merge->write to a fixed key which causes lost updates under concurrent
workers; change the strategy to avoid RMW races by either (A) writing
append/partitioned keys per run/day (use
build_video_master_key/build_channel_master_key to include run_id or date and
update load_video_master/load_channel_master to union/merge those partitions) or
(B) implement object-level concurrency control around _save_dataframe_key (use
S3 conditional PUT/If-Match with the previous ETag from load_video_master and
retry on precondition failure, or add an explicit distributed lock) and keep
_merge_master_rows logic unchanged; pick one approach and add retries/backoff
for optimistic failures.

---

Outside diff comments:
In `@docs/20.egograph/pipelines/youtube.md`:
- Around line 295-325: サンプル JSON に必須の sync_id 系フィールドが抜けているので、入力データ例「Browser
History Page View」と Parquet 行例「Watch Event」に schema で必須/保存として定義されている sync_id
系フィールド(例: sync_id や関連の sync_*
フィールド)を追加し、それぞれのフィールド名と想定される値フォーマットを入れてください(サンプル値を具体的に記載してスキーマ定義と一貫させる)。対象は見出し「12.1
入力データ例 (Browser History Page View)」内のページビュー JSON と「12.2 Parquet行例 (Watch
Event)」内の watch_event JSON、各オブジェクトに sync_id 系フィールドを追加してサンプル値を埋めてください。
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: edbe91f6-d662-43a7-a191-daed554592b3

📥 Commits

Reviewing files that changed from the base of the PR and between 6b34b9c and 47df7b7.

📒 Files selected for processing (19)
  • docs/20.egograph/backend/architecture.md
  • docs/20.egograph/pipelines/youtube.md
  • egograph/backend/infrastructure/database/youtube_queries.py
  • egograph/backend/tests/fixtures/youtube.py
  • egograph/backend/tests/unit/repositories/test_youtube_queries.py
  • egograph/backend/tests/unit/repositories/test_youtube_repository.py
  • egograph/pipelines/api/browser_history.py
  • egograph/pipelines/sources/youtube/api_client.py
  • egograph/pipelines/sources/youtube/extraction.py
  • egograph/pipelines/sources/youtube/metadata.py
  • egograph/pipelines/sources/youtube/pipeline.py
  • egograph/pipelines/sources/youtube/storage.py
  • egograph/pipelines/tests/integration/google_activity/test_pipeline.py
  • egograph/pipelines/tests/unit/browser_history/test_youtube_extraction.py
  • egograph/pipelines/tests/unit/browser_history/test_youtube_metadata.py
  • egograph/pipelines/tests/unit/google_activity/test_youtube_api.py
  • egograph/pipelines/tests/unit/test_api_endpoints.py
  • egograph/pipelines/tests/unit/youtube/test_pipeline.py
  • egograph/pipelines/tests/unit/youtube/test_storage.py
✅ Files skipped from review due to trivial changes (1)
  • egograph/pipelines/tests/unit/browser_history/test_youtube_metadata.py
🚧 Files skipped from review as they are similar to previous changes (7)
  • egograph/pipelines/tests/integration/google_activity/test_pipeline.py
  • docs/20.egograph/backend/architecture.md
  • egograph/pipelines/api/browser_history.py
  • egograph/pipelines/tests/unit/youtube/test_pipeline.py
  • egograph/pipelines/tests/unit/browser_history/test_youtube_extraction.py
  • egograph/pipelines/sources/youtube/extraction.py
  • egograph/pipelines/sources/youtube/pipeline.py

Comment thread egograph/backend/infrastructure/database/youtube_queries.py
Comment thread egograph/backend/infrastructure/database/youtube_queries.py
Comment thread egograph/pipelines/sources/youtube/api_client.py
Comment thread egograph/pipelines/sources/youtube/metadata.py
Comment thread egograph/pipelines/sources/youtube/storage.py Outdated
@endo-ly endo-ly merged commit 59ebfb9 into main Apr 22, 2026
3 checks passed
@endo-ly endo-ly added enhancement New feature or request backend labels Apr 22, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

backend enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant