diff --git a/.env.example b/.env.example index f2ff779..b2e16ea 100644 --- a/.env.example +++ b/.env.example @@ -5,3 +5,4 @@ DB_USER=root DB_PASSWORD=your-password-here DB_NAME=telegram_bot LOG_LEVEL=INFO +SAUCENAO_KEY= \ No newline at end of file diff --git a/COMMANDS.md b/COMMANDS.md index b0dca52..6348ed9 100644 --- a/COMMANDS.md +++ b/COMMANDS.md @@ -43,7 +43,7 @@ This document provides a comprehensive list of all available commands for this T ## 🌸 Anime & Manga | Command | Usage | Description | |---------|-------|-------------| -| `/sauce` | `/sauce` (reply) | Identify anime from image/screenshot | +| `/sauce` | `/sauce [-nsfw] [-manga] [-anime]` (reply) | Identify anime, manga, or art from image/screenshots. Falls back to SauceNAO for manga. | | `/anime` | `/anime ` | Search anime info from MAL | | `/manga` | `/manga ` | Search manga info from MAL | diff --git a/bot/config.py b/bot/config.py index 9f1f465..9b44c46 100644 --- a/bot/config.py +++ b/bot/config.py @@ -1,12 +1,11 @@ import os from dataclasses import dataclass +from urllib.parse import quote_plus + from dotenv import load_dotenv load_dotenv() - -from urllib.parse import quote_plus - @dataclass(frozen=True) class Settings: bot_token: str @@ -16,6 +15,7 @@ class Settings: db_password: str db_name: str log_level: str + saucenao_key: str | None = None @property def database_url(self) -> str: @@ -34,6 +34,7 @@ def load_settings() -> Settings: db_password=os.getenv("DB_PASSWORD", ""), db_name=os.getenv("DB_NAME", "telegram_bot"), log_level=os.getenv("LOG_LEVEL", "INFO"), + saucenao_key=os.getenv("SAUCENAO_KEY"), ) diff --git a/bot/plugins/anime/info.py b/bot/plugins/anime/info.py index fe9e176..c4832a8 100644 --- a/bot/plugins/anime/info.py +++ b/bot/plugins/anime/info.py @@ -1,7 +1,9 @@ import html + import httpx from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram.ext import Application, CommandHandler, ContextTypes + from bot.logger import get_logger logger = get_logger(__name__) diff --git a/bot/plugins/anime/trace.py b/bot/plugins/anime/trace.py index 06a5ee7..6fedd4c 100644 --- a/bot/plugins/anime/trace.py +++ b/bot/plugins/anime/trace.py @@ -1,8 +1,11 @@ import html + import httpx from telegram import Update from telegram.ext import Application, CommandHandler, ContextTypes + from bot.logger import get_logger +from bot.config import settings logger = get_logger(__name__) @@ -26,60 +29,137 @@ async def trace_anime(update: Update, context: ContextTypes.DEFAULT_TYPE): photo = target_msg.document if not photo: - await message.reply_text("Please reply to an image or sticker to trace its anime source.") + await message.reply_text("Please reply to an image, video, or sticker to find its source.") return - status_msg = await message.reply_text("šŸ” Searching for anime source...") + args = context.args + allow_nsfw = "-nsfw" in args + force_manga = "-manga" in args + force_anime = "-anime" in args + force_fanart = "-fanart" in args + + status_msg = await message.reply_text("šŸ” Searching for source...") try: file = await context.bot.get_file(photo.file_id) + image_bytes = bytes(await file.download_as_bytearray()) - async with httpx.AsyncClient() as client: - image_data = await file.download_as_bytearray() - response = await client.post( - "https://api.trace.moe/search?cutBorders&anilistInfo", - files={"image": ("image.jpg", bytes(image_data), "image/jpeg")} - ) - data = response.json() - - if response.status_code != 200 or not data.get("result"): - await status_msg.edit_text("āŒ Could not find any matching anime.") + if not (force_manga or force_fanart): + res = await _search_tracemoe(image_bytes, allow_nsfw) + if res and (res["similarity"] > 85 or force_anime): + await _send_tracemoe_res(message, status_msg, res, allow_nsfw) + return + + res = await _search_saucenao(image_bytes, allow_nsfw) + if res and res["similarity"] > 60: + await _send_saucenao_res(message, status_msg, res, allow_nsfw) return - result = data["result"][0] - anilist = result.get("anilist", {}) - title_native = anilist.get("title", {}).get("native", "Unknown") - title_romaji = anilist.get("title", {}).get("romaji", "Unknown") - title_english = anilist.get("title", {}).get("english") - - episode = result.get("episode", "Movie/OVA") - similarity = round(result.get("similarity", 0) * 100, 2) + await status_msg.edit_text("āŒ Could not find a reliable source (Anime or Manga).") + + except Exception as e: + logger.error(f"Sauce search error: {e}") + await status_msg.edit_text("āŒ An error occurred while searching for the source.") + +async def _search_tracemoe(image_bytes: bytes, allow_nsfw: bool): + url = "https://api.trace.moe/search?cutBorders&anilistInfo" + async with httpx.AsyncClient() as client: + response = await client.post(url, files={"image": ("image.jpg", image_bytes, "image/jpeg")}) + if response.status_code != 200: + return None + data = response.json() + if not data.get("result"): + return None - from_time = _format_time(result.get("from", 0)) - at_time = _format_time(result.get("at", 0)) - - text = ( - f"šŸŽ¬ Source Found! ({similarity}% match)\n\n" - f"šŸ‡ÆšŸ‡µ Native: {html.escape(title_native)}\n" - f"šŸ® Romaji: {html.escape(title_romaji)}\n" - ) + res = data["result"][0] + res["similarity"] = round(res.get("similarity", 0) * 100, 2) + return res + +async def _send_tracemoe_res(message, status_msg, result, allow_nsfw): + anilist = result.get("anilist", {}) + is_adult = anilist.get("isAdult", False) + + if is_adult and not allow_nsfw: + await status_msg.edit_text("šŸ”ž NSFW content detected (Anime).\nUse /sauce -nsfw to see this result.", parse_mode="HTML") + return + + title = anilist.get("title", {}).get("romaji") or anilist.get("title", {}).get("native", "Unknown") + episode = result.get("episode", "Movie/OVA") + from_time = _format_time(result.get("from", 0)) + video_url = result.get("video") + + text = ( + f"šŸŽ¬ Anime Source Found! ({result['similarity']}%)\n\n" + f"šŸ“ŗ Title: {html.escape(title)}\n" + f"šŸŽžļø Episode: {episode}\n" + f"ā±ļø Timestamp: {from_time}" + ) + if allow_nsfw and is_adult: + text = "šŸ”ž NSFW Content Enabled\n\n" + text + + if video_url: + await message.reply_video(video_url, caption=text, parse_mode="HTML") + await status_msg.delete() + else: + await status_msg.edit_text(text, parse_mode="HTML") + +async def _search_saucenao(image_bytes: bytes, allow_nsfw: bool): + params = { + "output_type": 2, + "testmode": 1, + "numres": 1, + "db": 999 + } + if settings.saucenao_key: + params["api_key"] = settings.saucenao_key + + url = "https://saucenao.com/search.php" + async with httpx.AsyncClient() as client: + files = {"file": ("image.jpg", image_bytes, "image/jpeg")} + response = await client.post(url, params=params, files=files) + if response.status_code != 200: + return None + data = response.json() - if title_english: - text += f"šŸ‡ŗšŸ‡ø English: {html.escape(title_english)}\n" + results = data.get("results") + if not results: + return None - text += f"\nšŸŽžļø Episode: {episode}\n" - text += f"ā±ļø Timestamp: {from_time} (Matched at {at_time})\n" - - video_url = result.get("video") - if video_url: - await message.reply_video(video_url, caption=text, parse_mode="HTML") - await status_msg.delete() - else: - await status_msg.edit_text(text, parse_mode="HTML") + res = results[0] + res["similarity"] = float(res["header"]["similarity"]) + return res - except Exception as e: - logger.error(f"Trace.moe error: {e}") - await status_msg.edit_text("āŒ An error occurred while tracing the image.") +async def _send_saucenao_res(message, status_msg, result, allow_nsfw): + data = result["data"] + header = result["header"] + similarity = result["similarity"] + + title = data.get("title") or data.get("source") or "Unknown" + + meta = [] + if "eng_name" in data: meta.append(f"ENG: {data['eng_name']}") + if "jp_name" in data: meta.append(f"JP: {data['jp_name']}") + if "author" in data: meta.append(f"Author: {data['author']}") + if "part" in data: meta.append(f"Part/Chapter: {data['part']}") + + links = data.get("ext_urls", []) + + text = ( + f"šŸ“– Manga/Art Source Found! ({similarity}%)\n\n" + f"šŸ“ Source: {html.escape(title)}\n" + ) + if meta: + text += "\n".join(meta) + "\n" + + if allow_nsfw: + text = "šŸ”ž NSFW Content Enabled\n\n" + text + + if links: + from telegram import InlineKeyboardButton, InlineKeyboardMarkup + keyboard = InlineKeyboardMarkup([[InlineKeyboardButton("šŸ”— View Source", url=links[0])]]) + await status_msg.edit_text(text, reply_markup=keyboard, parse_mode="HTML") + else: + await status_msg.edit_text(text, parse_mode="HTML") def _format_time(seconds: float) -> str: m, s = divmod(int(seconds), 60)