From 13be0911de6467f7899f404bc6dd1ef6e3b7568d Mon Sep 17 00:00:00 2001 From: Daniil Okhlopkov <5613295+ohld@users.noreply.github.com> Date: Sun, 5 Oct 2025 15:16:42 +0300 Subject: [PATCH] Improve resilience of Instagram parser --- src/flows/parsers/ig.py | 2 +- src/storage/parsers/ig.py | 94 +++++++++++++++++++++++---------------- 2 files changed, 57 insertions(+), 39 deletions(-) diff --git a/src/flows/parsers/ig.py b/src/flows/parsers/ig.py index 097f72b..5eb71c1 100644 --- a/src/flows/parsers/ig.py +++ b/src/flows/parsers/ig.py @@ -22,7 +22,7 @@ async def parse_ig_source( logger.info(f"Going to parse feed ig user id {instagram_user_id}") medias = await get_user_medias(instagram_user_id) - if len(medias) > 0: + if medias: await insert_parsed_posts_from_ig(meme_source_id, medias) await update_meme_source(meme_source_id=meme_source_id, parsed_at=datetime.utcnow()) diff --git a/src/storage/parsers/ig.py b/src/storage/parsers/ig.py index 0fe037b..40538fc 100644 --- a/src/storage/parsers/ig.py +++ b/src/storage/parsers/ig.py @@ -7,48 +7,50 @@ from src.storage.parsers.schemas import IgPostParsingResult -async def _get_user_info( - username: str, +HIKERAPI_BASE_URL = "https://api.hikerapi.com/v2" +HIKERAPI_HEADERS = { + "accept": "application/json", + "x-access-key": settings.HIKERAPI_TOKEN, +} + + +async def _fetch_hikerapi( # pragma: no cover - thin wrapper around httpx + endpoint: str, + *, + params: dict[str, str | int], + not_found_message: str, ) -> dict | None: - async with httpx.AsyncClient(timeout=20.0) as client: + async with httpx.AsyncClient(base_url=HIKERAPI_BASE_URL, timeout=20.0) as client: try: - response = await client.get( - "https://api.hikerapi.com/v2/user/by/username", - params={"username": username}, - headers={ - "accept": "application/json", - "x-access-key": settings.HIKERAPI_TOKEN, - }, - ) + response = await client.get(endpoint, params=params, headers=HIKERAPI_HEADERS) response.raise_for_status() except httpx.HTTPStatusError as exc: if exc.response.status_code == 404: - logging.warning( - "Instagram user '%s' not found. Skipping.", - username, - ) + logging.warning(not_found_message) return None raise + except httpx.RequestError as exc: + logging.error("Failed to reach HikerAPI endpoint %s: %s", endpoint, exc) + raise return response.json() -async def _get_user_medias( - user_id: int, -) -> dict | None: - async with httpx.AsyncClient(timeout=20.0) as client: - response = await client.get( - "https://api.hikerapi.com/v2/user/medias", - params={"user_id": user_id}, - headers={ - "accept": "application/json", - "x-access-key": settings.HIKERAPI_TOKEN, - }, - ) +async def _get_user_info(username: str) -> dict | None: + return await _fetch_hikerapi( + "/user/by/username", + params={"username": username}, + not_found_message=f"Instagram user '{username}' not found. Skipping.", + ) - response.raise_for_status() - return response.json() + +async def _get_user_medias(user_id: int) -> dict | None: + return await _fetch_hikerapi( + "/user/medias", + params={"user_id": user_id}, + not_found_message=f"Instagram user with id '{user_id}' not found. Skipping.", + ) async def get_user_info(instagram_username: str): @@ -56,23 +58,39 @@ async def get_user_info(instagram_username: str): if not user_info_response: return None - if user_info_response["status"] != "ok" or not user_info_response.get("user"): + status = user_info_response.get("status") + user = user_info_response.get("user") + if status != "ok" or not user: logging.warning( - f"Failed to get @{instagram_username} info. Result: {user_info_response}" + "Failed to get @%s info. Result: %s", + instagram_username, + user_info_response, ) return None - return user_info_response["user"] + return user -async def get_user_medias(user_id: int) -> list[IgPostParsingResult] | None: +async def get_user_medias(user_id: int) -> list[IgPostParsingResult]: user_medias_response = await _get_user_medias(user_id) - if user_medias_response["response"]["status"] != "ok": - logging.warning(f"Failed to get {user_id} medias: {user_medias_response}") - return None + if not user_medias_response: + return [] + + response_payload = user_medias_response.get("response") or {} + if response_payload.get("status") != "ok": + logging.warning("Failed to get %s medias: %s", user_id, user_medias_response) + return [] + + medias = response_payload.get("items") or [] + if not isinstance(medias, list): + logging.warning( + "Unexpected medias payload for %s: %s", + user_id, + user_medias_response, + ) + return [] - medias = user_medias_response["response"]["items"] - logging.info(f"Received {len(medias)} medias for {user_id}") + logging.info("Received %s medias for %s", len(medias), user_id) # serialize medias return [