diff --git a/src/flows/parsers/ig.py b/src/flows/parsers/ig.py index 097f72b..5eb71c1 100644 --- a/src/flows/parsers/ig.py +++ b/src/flows/parsers/ig.py @@ -22,7 +22,7 @@ async def parse_ig_source( logger.info(f"Going to parse feed ig user id {instagram_user_id}") medias = await get_user_medias(instagram_user_id) - if len(medias) > 0: + if medias: await insert_parsed_posts_from_ig(meme_source_id, medias) await update_meme_source(meme_source_id=meme_source_id, parsed_at=datetime.utcnow()) diff --git a/src/storage/parsers/ig.py b/src/storage/parsers/ig.py index 697c2c0..6661624 100644 --- a/src/storage/parsers/ig.py +++ b/src/storage/parsers/ig.py @@ -7,29 +7,32 @@ from src.storage.parsers.schemas import IgPostParsingResult -async def _get_user_info( - username: str, +HIKERAPI_BASE_URL = "https://api.hikerapi.com/v2" +HIKERAPI_HEADERS = { + "accept": "application/json", + "x-access-key": settings.HIKERAPI_TOKEN, +} + + +async def _fetch_hikerapi( # pragma: no cover - thin wrapper around httpx + endpoint: str, + *, + params: dict[str, str | int], + not_found_message: str, ) -> dict | None: - async with httpx.AsyncClient(timeout=20.0) as client: + async with httpx.AsyncClient(base_url=HIKERAPI_BASE_URL, timeout=20.0) as client: try: - response = await client.get( - "https://api.hikerapi.com/v2/user/by/username", - params={"username": username}, - headers={ - "accept": "application/json", - "x-access-key": settings.HIKERAPI_TOKEN, - }, - ) + response = await client.get(endpoint, params=params, headers=HIKERAPI_HEADERS) response.raise_for_status() except httpx.HTTPStatusError as exc: if exc.response.status_code == 404: - logging.warning( - "Instagram user '%s' not found. Skipping.", - username, - ) + logging.warning(not_found_message) return None raise + except httpx.RequestError as exc: + logging.error("Failed to reach HikerAPI endpoint %s: %s", endpoint, exc) + raise return response.json() @@ -66,16 +69,20 @@ async def get_user_info(instagram_username: str): if not user_info_response: return None - if user_info_response["status"] != "ok" or not user_info_response.get("user"): + status = user_info_response.get("status") + user = user_info_response.get("user") + if status != "ok" or not user: logging.warning( - f"Failed to get @{instagram_username} info. Result: {user_info_response}" + "Failed to get @%s info. Result: %s", + instagram_username, + user_info_response, ) return None - return user_info_response["user"] + return user -async def get_user_medias(user_id: int) -> list[IgPostParsingResult] | None: +async def get_user_medias(user_id: int) -> list[IgPostParsingResult]: user_medias_response = await _get_user_medias(user_id) if not user_medias_response: return None @@ -83,8 +90,7 @@ async def get_user_medias(user_id: int) -> list[IgPostParsingResult] | None: logging.warning(f"Failed to get {user_id} medias: {user_medias_response}") return None - medias = user_medias_response["response"]["items"] - logging.info(f"Received {len(medias)} medias for {user_id}") + logging.info("Received %s medias for %s", len(medias), user_id) # serialize medias return [