Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ DB_USER=root
DB_PASSWORD=your-password-here
DB_NAME=telegram_bot
LOG_LEVEL=INFO
SAUCENAO_KEY=
2 changes: 1 addition & 1 deletion COMMANDS.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ This document provides a comprehensive list of all available commands for this T
## 🌸 Anime & Manga
| Command | Usage | Description |
|---------|-------|-------------|
| `/sauce` | `/sauce` (reply) | Identify anime from image/screenshot |
| `/sauce` | `/sauce [-nsfw] [-manga] [-anime]` (reply) | Identify anime, manga, or art from image/screenshots. Falls back to SauceNAO for manga. |
| `/anime` | `/anime <query>` | Search anime info from MAL |
| `/manga` | `/manga <query>` | Search manga info from MAL |

Expand Down
7 changes: 4 additions & 3 deletions bot/config.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import os
from dataclasses import dataclass
from urllib.parse import quote_plus

from dotenv import load_dotenv

load_dotenv()


from urllib.parse import quote_plus

@dataclass(frozen=True)
class Settings:
bot_token: str
Expand All @@ -16,6 +15,7 @@ class Settings:
db_password: str
db_name: str
log_level: str
saucenao_key: str | None = None

@property
def database_url(self) -> str:
Expand All @@ -34,6 +34,7 @@ def load_settings() -> Settings:
db_password=os.getenv("DB_PASSWORD", ""),
db_name=os.getenv("DB_NAME", "telegram_bot"),
log_level=os.getenv("LOG_LEVEL", "INFO"),
saucenao_key=os.getenv("SAUCENAO_KEY"),
)


Expand Down
2 changes: 2 additions & 0 deletions bot/plugins/anime/info.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import html

import httpx
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, ContextTypes

from bot.logger import get_logger

logger = get_logger(__name__)
Expand Down
164 changes: 122 additions & 42 deletions bot/plugins/anime/trace.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import html

import httpx
from telegram import Update
from telegram.ext import Application, CommandHandler, ContextTypes

from bot.logger import get_logger
from bot.config import settings

logger = get_logger(__name__)

Expand All @@ -26,60 +29,137 @@ async def trace_anime(update: Update, context: ContextTypes.DEFAULT_TYPE):
photo = target_msg.document

if not photo:
await message.reply_text("Please reply to an image or sticker to trace its anime source.")
await message.reply_text("Please reply to an image, video, or sticker to find its source.")
return

status_msg = await message.reply_text("🔍 Searching for anime source...")
args = context.args
allow_nsfw = "-nsfw" in args
force_manga = "-manga" in args
force_anime = "-anime" in args
force_fanart = "-fanart" in args

status_msg = await message.reply_text("🔍 Searching for source...")

try:
file = await context.bot.get_file(photo.file_id)
image_bytes = bytes(await file.download_as_bytearray())

async with httpx.AsyncClient() as client:
image_data = await file.download_as_bytearray()
response = await client.post(
"https://api.trace.moe/search?cutBorders&anilistInfo",
files={"image": ("image.jpg", bytes(image_data), "image/jpeg")}
)
data = response.json()

if response.status_code != 200 or not data.get("result"):
await status_msg.edit_text("❌ Could not find any matching anime.")
if not (force_manga or force_fanart):
res = await _search_tracemoe(image_bytes, allow_nsfw)
if res and (res["similarity"] > 85 or force_anime):
await _send_tracemoe_res(message, status_msg, res, allow_nsfw)
return

res = await _search_saucenao(image_bytes, allow_nsfw)
if res and res["similarity"] > 60:
await _send_saucenao_res(message, status_msg, res, allow_nsfw)
return

result = data["result"][0]
anilist = result.get("anilist", {})
title_native = anilist.get("title", {}).get("native", "Unknown")
title_romaji = anilist.get("title", {}).get("romaji", "Unknown")
title_english = anilist.get("title", {}).get("english")

episode = result.get("episode", "Movie/OVA")
similarity = round(result.get("similarity", 0) * 100, 2)
await status_msg.edit_text("❌ Could not find a reliable source (Anime or Manga).")

except Exception as e:
logger.error(f"Sauce search error: {e}")
await status_msg.edit_text("❌ An error occurred while searching for the source.")

async def _search_tracemoe(image_bytes: bytes, allow_nsfw: bool):
url = "https://api.trace.moe/search?cutBorders&anilistInfo"
async with httpx.AsyncClient() as client:
response = await client.post(url, files={"image": ("image.jpg", image_bytes, "image/jpeg")})
if response.status_code != 200:
return None
data = response.json()
if not data.get("result"):
return None

from_time = _format_time(result.get("from", 0))
at_time = _format_time(result.get("at", 0))

text = (
f"🎬 <b>Source Found!</b> ({similarity}% match)\n\n"
f"🇯🇵 <b>Native:</b> {html.escape(title_native)}\n"
f"🏮 <b>Romaji:</b> {html.escape(title_romaji)}\n"
)
res = data["result"][0]
res["similarity"] = round(res.get("similarity", 0) * 100, 2)
return res

async def _send_tracemoe_res(message, status_msg, result, allow_nsfw):
anilist = result.get("anilist", {})
is_adult = anilist.get("isAdult", False)

if is_adult and not allow_nsfw:
await status_msg.edit_text("🔞 <b>NSFW content detected (Anime).</b>\nUse <code>/sauce -nsfw</code> to see this result.", parse_mode="HTML")
return

title = anilist.get("title", {}).get("romaji") or anilist.get("title", {}).get("native", "Unknown")
episode = result.get("episode", "Movie/OVA")
from_time = _format_time(result.get("from", 0))
video_url = result.get("video")

text = (
f"🎬 <b>Anime Source Found!</b> ({result['similarity']}%)\n\n"
f"📺 <b>Title:</b> {html.escape(title)}\n"
f"🎞️ <b>Episode:</b> {episode}\n"
f"⏱️ <b>Timestamp:</b> {from_time}"
)
if allow_nsfw and is_adult:
text = "🔞 <b>NSFW Content Enabled</b>\n\n" + text

if video_url:
await message.reply_video(video_url, caption=text, parse_mode="HTML")
await status_msg.delete()
else:
await status_msg.edit_text(text, parse_mode="HTML")

async def _search_saucenao(image_bytes: bytes, allow_nsfw: bool):
params = {
"output_type": 2,
"testmode": 1,
"numres": 1,
"db": 999
}
if settings.saucenao_key:
params["api_key"] = settings.saucenao_key

url = "https://saucenao.com/search.php"
async with httpx.AsyncClient() as client:
files = {"file": ("image.jpg", image_bytes, "image/jpeg")}
response = await client.post(url, params=params, files=files)
if response.status_code != 200:
return None
data = response.json()

if title_english:
text += f"🇺🇸 <b>English:</b> {html.escape(title_english)}\n"
results = data.get("results")
if not results:
return None

text += f"\n🎞️ <b>Episode:</b> {episode}\n"
text += f"⏱️ <b>Timestamp:</b> {from_time} (Matched at {at_time})\n"

video_url = result.get("video")
if video_url:
await message.reply_video(video_url, caption=text, parse_mode="HTML")
await status_msg.delete()
else:
await status_msg.edit_text(text, parse_mode="HTML")
res = results[0]
res["similarity"] = float(res["header"]["similarity"])
return res

except Exception as e:
logger.error(f"Trace.moe error: {e}")
await status_msg.edit_text("❌ An error occurred while tracing the image.")
async def _send_saucenao_res(message, status_msg, result, allow_nsfw):
data = result["data"]
header = result["header"]
similarity = result["similarity"]

title = data.get("title") or data.get("source") or "Unknown"

meta = []
if "eng_name" in data: meta.append(f"<b>ENG:</b> {data['eng_name']}")
if "jp_name" in data: meta.append(f"<b>JP:</b> {data['jp_name']}")
if "author" in data: meta.append(f"<b>Author:</b> {data['author']}")
if "part" in data: meta.append(f"<b>Part/Chapter:</b> {data['part']}")

links = data.get("ext_urls", [])

text = (
f"📖 <b>Manga/Art Source Found!</b> ({similarity}%)\n\n"
f"📝 <b>Source:</b> {html.escape(title)}\n"
)
if meta:
text += "\n".join(meta) + "\n"

if allow_nsfw:
text = "🔞 <b>NSFW Content Enabled</b>\n\n" + text

if links:
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
keyboard = InlineKeyboardMarkup([[InlineKeyboardButton("🔗 View Source", url=links[0])]])
await status_msg.edit_text(text, reply_markup=keyboard, parse_mode="HTML")
else:
await status_msg.edit_text(text, parse_mode="HTML")

def _format_time(seconds: float) -> str:
m, s = divmod(int(seconds), 60)
Expand Down