-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
680 lines (592 loc) · 27.3 KB
/
main.py
File metadata and controls
680 lines (592 loc) · 27.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
from __future__ import annotations
import logging
import os
import sys
import datetime as dt
import threading
import http.server
import socketserver
from dataclasses import dataclass
from typing import Optional
from dotenv import load_dotenv
from telegram import InlineKeyboardButton, InlineKeyboardMarkup, Update
from telegram.constants import ParseMode
from telegram.ext import (
AIORateLimiter,
Application,
CallbackQueryHandler,
CommandHandler,
ContextTypes,
)
from zoneinfo import ZoneInfo
from aiohttp import web # NEW: for webhook server & /healthz
from quiz_engine import QuizEngine
from storage import (
init_db, record_result, get_score,
get_daily_count, inc_daily_count,
set_notify_time, get_notify_time,
mark_day_complete, get_streak, set_user_name, get_top_streaks,
iter_all_notify_prefs,
)
from questions import get_random_qa, QA
import pathlib
load_dotenv(dotenv_path=pathlib.Path(".env"), override=True)
logging.basicConfig(
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
level=logging.INFO,
)
logger = logging.getLogger("commit-quiz-bot")
DAILY_CAP = 5
DEFAULT_TZ = "Asia/Kolkata"
# === NEW: Webhook/Render config ===
WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET", "defaultsecret") # set a real one on Render
RENDER_URL = os.getenv("RENDER_EXTERNAL_URL") # present on Render
WEBHOOK_PATH = f"/telegram/{WEBHOOK_SECRET}"
PORT = int(os.getenv("PORT", "8000"))
# -------------------- Keep-alive HTTP server for Render free tier --------------------
def _start_keepalive():
"""
Run a tiny HTTP server in a daemon thread so local dev stays healthy while using
Telegram long-polling. On Render webhook mode, we SKIP this and let PTB bind $PORT.
"""
# NEW: Skip keepalive if on Render (webhook mode)
if RENDER_URL:
return
port = int(os.environ.get("PORT", "8000"))
class Handler(http.server.SimpleHTTPRequestHandler):
def log_message(self, format, *args):
# keep logs quieter (Render/health pings a lot)
return
def do_GET(self):
if self.path in ("/", "/health", "/healthz"):
self.send_response(200)
self.end_headers()
self.wfile.write(b"ok")
else:
self.send_response(404)
self.end_headers()
def serve():
with socketserver.TCPServer(("", port), Handler) as httpd:
logging.getLogger("commit-quiz-bot").info("Keepalive HTTP on %s", port)
httpd.serve_forever()
threading.Thread(target=serve, daemon=True).start()
# === NEW: Health endpoint for webhook server ===
async def _healthz_handler(request: web.Request) -> web.Response:
return web.Response(text="ok")
# -------------------- Data models --------------------
@dataclass
class CurrentQuestion:
username: str
text: str
options: list[int]
correct_index: int
date_iso: str
@dataclass
class CSQuestion:
category: str
text: str
options: list[str]
correct_index: int
engine = QuizEngine()
HELP_TEXT = (
"👋 *Contribution Graph Pop Quiz*\n\n"
"Commands:\n"
"• `/start` — welcome\n"
"• `/setuser <github-username>` — set your GitHub username (for /quiz)\n"
"• `/quiz` — GitHub contribution-count question (original)\n"
"• `/daily` — 5-question CS quiz (DSA, Cloud, Cybersecurity, DevOps, AI/ML, Data Science, General CS)\n"
"• `/notify HH:MM [Area/City]` — daily reminder time (e.g., `/notify 07:30 Asia/Kolkata`)\n"
"• `/when` — show your next reminder time\n"
"• `/unnotify` — disable your daily reminder\n"
"• `/streak` — show your current/best streak\n"
"• `/streakboard` — top streaks in this chat\n"
"• `/score` — overall score (GitHub flow)\n"
"• `/forcecommit [n] [tag]` — manually trigger n commits (debug graph)\n"
"• `/help` — this message\n"
)
# -------------------- Helpers --------------------
def safe_zoneinfo(tzname: str) -> ZoneInfo:
try:
return ZoneInfo(tzname)
except Exception:
return ZoneInfo(DEFAULT_TZ)
def _today_ymd(tzname: str) -> str:
tz = safe_zoneinfo(tzname)
return dt.datetime.now(tz=tz).date().isoformat()
def _display_name(u) -> str:
parts = [u.first_name or ""]
if u.last_name:
parts.append(u.last_name)
return " ".join(p for p in parts if p).strip() or (u.username or f"User {u.id}")
# ---- JobQueue-safe storage for CS questions (so scheduled jobs work) ----
def _store_cs_question(context: ContextTypes.DEFAULT_TYPE, user_id: int, csq: "CSQuestion") -> None:
# Interactive updates: per-user context.user_data available
if isinstance(getattr(context, "user_data", None), dict):
context.user_data["cs_q"] = csq
return
# JobQueue: context.user_data is None -> store in application.user_data[user_id]
try:
app_ud = context.application.user_data # type: ignore[attr-defined]
except Exception:
app_ud = None
if isinstance(app_ud, dict):
bucket = app_ud.setdefault(user_id, {})
bucket["cs_q"] = csq
else:
logger.warning("Could not persist cs_q (no user_data/app.user_data available).")
def _load_cs_question(update: Update, context: ContextTypes.DEFAULT_TYPE) -> Optional["CSQuestion"]:
csq = None
if isinstance(getattr(context, "user_data", None), dict):
csq = context.user_data.get("cs_q")
if csq is None:
try:
user_id = update.effective_user.id
app_ud = context.application.user_data # type: ignore[attr-defined]
if isinstance(app_ud, dict):
csq = app_ud.get(user_id, {}).get("cs_q")
except Exception:
pass
return csq
# -------------------- GitHub quiz (original mode) --------------------
async def start(update: Update, context: ContextTypes.DEFAULT_TYPE):
set_user_name(update.effective_chat.id, update.effective_user.id, _display_name(update.effective_user))
await update.message.reply_text(
"Welcome to *Contribution Graph Pop Quiz*! 🎯\n\n"
"GitHub mode: `/setuser <username>` then `/quiz`\n"
"CS Daily mode: `/daily` (5 questions/day). Set reminder with `/notify HH:MM [TZ]`.\n",
parse_mode=ParseMode.MARKDOWN,
)
async def help_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE):
set_user_name(update.effective_chat.id, update.effective_user.id, _display_name(update.effective_user))
await update.message.reply_text(HELP_TEXT, parse_mode=ParseMode.MARKDOWN)
async def setuser(update: Update, context: ContextTypes.DEFAULT_TYPE):
set_user_name(update.effective_chat.id, update.effective_user.id, _display_name(update.effective_user))
if not context.args:
await update.message.reply_text("Usage: `/setuser <github-username>`", parse_mode=ParseMode.MARKDOWN)
return
username = context.args[0].strip()
try:
_ = engine.load_user_year(username)
except Exception:
await update.message.reply_text(
"❌ Couldn't validate that username via the contributions graph. "
"Please check the spelling (case-sensitive) and try again.",
)
return
context.user_data["username"] = username
await update.message.reply_text(
f"✅ Saved GitHub username: *{username}*\nUse `/quiz` to begin!",
parse_mode=ParseMode.MARKDOWN
)
def _format_options(options: list[int]) -> InlineKeyboardMarkup:
buttons = []
labels = ["A", "B", "C", "D"]
row = []
for i, (label, val) in enumerate(zip(labels, options)):
row.append(InlineKeyboardButton(f"{label}: {val}", callback_data=f"opt:{i}"))
if (i + 1) % 2 == 0:
buttons.append(row)
row = []
if row:
buttons.append(row)
buttons.append([InlineKeyboardButton("⏭ Next question", callback_data="next")])
return InlineKeyboardMarkup(buttons)
async def _ask_question(chat_id: int, context: ContextTypes.DEFAULT_TYPE, username: str):
q = engine.make_question(username)
context.user_data["current_q"] = CurrentQuestion(
username=username,
text=q.text,
options=q.options,
correct_index=q.correct_index,
date_iso=q.date.isoformat(),
)
await context.bot.send_message(
chat_id=chat_id,
text=f"🧩 *{q.text}*\n\nPick one:",
reply_markup=_format_options(q.options),
parse_mode=ParseMode.MARKDOWN,
)
async def quiz(update: Update, context: ContextTypes.DEFAULT_TYPE):
set_user_name(update.effective_chat.id, update.effective_user.id, _display_name(update.effective_user))
username = context.user_data.get("username")
if not username:
await update.message.reply_text("First set your GitHub username: `/setuser <username>`", parse_mode=ParseMode.MARKDOWN)
return
await _ask_question(update.effective_chat.id, context, username)
async def score(update: Update, context: ContextTypes.DEFAULT_TYPE):
set_user_name(update.effective_chat.id, update.effective_user.id, _display_name(update.effective_user))
correct, total = get_score(update.effective_chat.id, update.effective_user.id)
if total == 0:
await update.message.reply_text("You haven't answered any questions yet. Use `/quiz` to start!")
else:
await update.message.reply_text(f"📊 Score: *{correct} / {total}* correct.", parse_mode=ParseMode.MARKDOWN)
async def cb_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()
set_user_name(query.message.chat_id, query.from_user.id, _display_name(query.from_user))
data = query.data
current: Optional[CurrentQuestion] = context.user_data.get("current_q")
if not current:
await query.edit_message_text("Session expired. Use `/quiz` to start a new question.")
return
if data == "next":
await _ask_question(update.effective_chat.id, context, current.username)
return
if not data.startswith("opt:"):
await query.edit_message_text("Invalid action. Use `/quiz` to start again.")
return
try:
idx = int(data.split(":")[1])
except ValueError:
await query.edit_message_text("Invalid option. Use `/quiz` to start again.")
return
is_correct = (idx == current.correct_index)
record_result(update.effective_chat.id, update.effective_user.id, is_correct)
verdict = "✅ Correct!" if is_correct else f"❌ Incorrect. The right answer was *{current.options[current.correct_index]}*."
explain = f"_GitHub contributions on {current.date_iso}_"
await query.edit_message_text(
text=f"{verdict}\n\n{explain}",
parse_mode=ParseMode.MARKDOWN,
reply_markup=_format_options(current.options),
)
# -------------------- CS Daily quiz --------------------
def _format_cs_options(options: list[str]) -> InlineKeyboardMarkup:
buttons = []
labels = ["A", "B", "C", "D"]
row = []
for i, (label, val) in enumerate(zip(labels, options)):
row.append(InlineKeyboardButton(f"{label}: {val}", callback_data=f"cs:opt:{i}"))
if (i + 1) % 2 == 0:
buttons.append(row)
row = []
if row:
buttons.append(row)
buttons.append([InlineKeyboardButton("⏭ Next question", callback_data="cs:next")])
return InlineKeyboardMarkup(buttons)
async def _ask_cs_question(chat_id: int, context: ContextTypes.DEFAULT_TYPE, tzname: str, user_id: Optional[int] = None):
qa: QA = get_random_qa()
csq = CSQuestion(
category=qa.category,
text=qa.question,
options=qa.options,
correct_index=qa.correct_index,
)
if user_id is None:
try:
user_id = context.update.effective_user.id # type: ignore[attr-defined]
except Exception:
user_id = None
if user_id is not None:
_store_cs_question(context, user_id, csq)
await context.bot.send_message(
chat_id=chat_id,
text=f"🧠 *{csq.category}*: {csq.text}",
reply_markup=_format_cs_options(csq.options),
parse_mode=ParseMode.MARKDOWN,
)
async def daily(update: Update, context: ContextTypes.DEFAULT_TYPE):
set_user_name(update.effective_chat.id, update.effective_user.id, _display_name(update.effective_user))
prefs = get_notify_time(update.effective_chat.id, update.effective_user.id)
tzname = prefs[2] if prefs else DEFAULT_TZ
today = _today_ymd(tzname)
answered = get_daily_count(update.effective_chat.id, update.effective_user.id, today)
if answered >= DAILY_CAP:
st, best, _ = get_streak(update.effective_chat.id, update.effective_user.id)
await update.message.reply_text(f"🎉 You've completed today's {DAILY_CAP}. Streak: *{st}* (best *{best}*). See you tomorrow!")
return
await _ask_cs_question(update.effective_chat.id, context, tzname, user_id=update.effective_user.id)
async def cs_cb_handler(update: Update, context: ContextTypes.DEFAULT_TYPE):
query = update.callback_query
await query.answer()
set_user_name(query.message.chat_id, query.from_user.id, _display_name(query.from_user))
data = query.data
prefs = get_notify_time(update.effective_chat.id, update.effective_user.id)
tzname = prefs[2] if prefs else DEFAULT_TZ
today = _today_ymd(tzname)
csq: Optional[CSQuestion] = _load_cs_question(update, context)
if not csq:
await query.edit_message_text("Session expired. Use `/daily` to start again.")
return
if data == "cs:next":
count_now = get_daily_count(update.effective_chat.id, update.effective_user.id, today)
if count_now >= DAILY_CAP:
st, best, _ = get_streak(update.effective_chat.id, update.effective_user.id)
await query.edit_message_text(f"🎉 Done for today — {DAILY_CAP}/{DAILY_CAP}. Streak: *{st}* (best *{best}*).")
return
await _ask_cs_question(update.effective_chat.id, context, tzname, user_id=update.effective_user.id)
return
if not data.startswith("cs:opt:"):
await query.edit_message_text("Invalid action. Use `/daily` to start again.")
return
try:
idx = int(data.split(":")[2])
except ValueError:
await query.edit_message_text("Invalid option. Use `/daily` to start again.")
return
is_correct = (idx == csq.correct_index)
count_after = inc_daily_count(update.effective_chat.id, update.effective_user.id, today)
streak_msg = ""
if count_after >= DAILY_CAP:
streak, best, _ = mark_day_complete(update.effective_chat.id, update.effective_user.id, today)
streak_msg = f"\n\n🔥 *Streak*: {streak} day(s) (best {best})"
# Trigger 5 commits when the day’s 5 Qs are done
try:
from github_committer import make_daily_commits_if_configured
tag = str(update.effective_user.id)
info = make_daily_commits_if_configured(n=5, tag=tag)
if info:
logger.info(info)
except Exception as e:
logger.exception("GitHub commit failed", exc_info=e)
verdict = "✅ Correct!" if is_correct else f"❌ Incorrect. The right answer was *{csq.options[csq.correct_index]}*."
footer = f"Progress today: {min(count_after, DAILY_CAP)} / {DAILY_CAP}{streak_msg}"
await query.edit_message_text(
text=f"{verdict}\n\n_{csq.category}_\n\n{footer}",
parse_mode=ParseMode.MARKDOWN,
reply_markup=_format_cs_options(csq.options),
)
# -------------------- Daily reminder scheduling --------------------
async def _daily_job(context: ContextTypes.DEFAULT_TYPE):
job = context.job
chat_id = job.chat_id
user_id = job.data["user_id"]
tzname = job.data["tz"]
today = _today_ymd(tzname)
answered = get_daily_count(chat_id, user_id, today)
if answered >= DAILY_CAP:
return
await _ask_cs_question(chat_id, context, tzname, user_id=user_id)
def _reschedule_all_jobs(app: Application):
"""
Recreate all daily reminder jobs from DB (so jobs survive bot restarts/redeploys).
"""
if app.job_queue is None:
logger.error('JobQueue not available. Install PTB with: pip install "python-telegram-bot[job-queue]"')
return
# Remove existing daily-* jobs to avoid duplicates
for j in list(app.job_queue.jobs()):
if j.name and j.name.startswith("daily-"):
j.schedule_removal()
from datetime import time as dtime
total = 0
for chat_id, user_id, hour, minute, tzname in iter_all_notify_prefs():
tz = safe_zoneinfo(tzname)
job_name = f"daily-{chat_id}-{user_id}"
app.job_queue.run_daily(
_daily_job,
time=dtime(hour=hour, minute=minute, tzinfo=tz),
name=job_name,
chat_id=chat_id,
data={"user_id": user_id, "tz": tzname},
)
total += 1
logger.info("Rescheduled %d daily reminder job(s) from DB.", total)
# -------------------- /notify + /when + /unnotify --------------------
async def notify(update: Update, context: ContextTypes.DEFAULT_TYPE):
"""
/notify HH:MM [Area/City] e.g., /notify 07:30 Asia/Kolkata
Sets a daily reminder + sends a 2s test question to confirm it's armed.
"""
set_user_name(update.effective_chat.id, update.effective_user.id, _display_name(update.effective_user))
if not context.args:
await update.message.reply_text(
"Usage: `/notify HH:MM [Area/City]`\nExample: `/notify 07:30 Asia/Kolkata`",
parse_mode=ParseMode.MARKDOWN,
)
return
time_part = context.args[0]
tzname = context.args[1] if len(context.args) > 1 else DEFAULT_TZ
try:
hour, minute = map(int, time_part.split(":"))
assert 0 <= hour <= 23 and 0 <= minute <= 59
tz = safe_zoneinfo(tzname)
except Exception:
await update.message.reply_text(
"❌ Invalid time or timezone. Example: `/notify 07:30 Asia/Kolkata`",
parse_mode=ParseMode.MARKDOWN,
)
return
set_notify_time(update.effective_chat.id, update.effective_user.id, hour, minute, tzname)
app: Application = context.application
chat_id = update.effective_chat.id
user_id = update.effective_user.id
job_name = f"daily-{chat_id}-{user_id}"
if app.job_queue is None:
logger.error('python-telegram-bot[job-queue] is not installed. '
'Run: pip install "python-telegram-bot[job-queue]"')
await update.message.reply_text("🚫 Job queue not available. Please install PTB with job-queue extra.")
return
for j in app.job_queue.get_jobs_by_name(job_name):
j.schedule_removal()
now = dt.datetime.now(tz=tz)
target = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
if target <= now:
target = target + dt.timedelta(days=1)
from datetime import time as dtime
app.job_queue.run_daily(
_daily_job,
time=dtime(hour=hour, minute=minute, tzinfo=tz),
name=job_name,
chat_id=chat_id,
data={"user_id": user_id, "tz": tzname},
)
app.job_queue.run_once(
_daily_job,
when=2,
name=f"test-{job_name}",
chat_id=chat_id,
data={"user_id": user_id, "tz": tzname},
)
pretty_next = target.strftime("%Y-%m-%d %H:%M")
await update.message.reply_text(
f"⏰ Daily reminder set for *{hour:02d}:{minute:02d}* ({tzname}).\n"
f"Next run: *{pretty_next}* {tzname}\n"
f"✅ I’ll send a test question in ~2s to confirm.",
parse_mode=ParseMode.MARKDOWN,
)
async def when_cmd(update: Update, context: ContextTypes.DEFAULT_TYPE):
set_user_name(update.effective_chat.id, update.effective_user.id, _display_name(update.effective_user))
prefs = get_notify_time(update.effective_chat.id, update.effective_user.id)
if not prefs:
await update.message.reply_text("No reminder set. Use `/notify HH:MM [Area/City]` first.", parse_mode=ParseMode.MARKDOWN)
return
hour, minute, tzname = prefs
tz = safe_zoneinfo(tzname)
now = dt.datetime.now(tz=tz)
target = now.replace(hour=hour, minute=minute, second=0, microsecond=0)
if target <= now:
target = target + dt.timedelta(days=1)
await update.message.reply_text(
f"🗓️ Next reminder: *{target.strftime('%Y-%m-%d %H:%M')}* ({tzname})",
parse_mode=ParseMode.MARKDOWN,
)
async def unnotify(update: Update, context: ContextTypes.DEFAULT_TYPE):
set_user_name(update.effective_chat.id, update.effective_user.id, _display_name(update.effective_user))
app: Application = context.application
if app.job_queue is None:
await update.message.reply_text("No active daily reminder to cancel (job queue unavailable).")
return
job_name = f"daily-{update.effective_chat.id}-{update.effective_user.id}"
removed = False
for j in app.job_queue.get_jobs_by_name(job_name):
j.schedule_removal()
removed = True
if removed:
await update.message.reply_text("🛑 Daily reminder disabled.")
else:
await update.message.reply_text("No active daily reminder to cancel.")
# -------------------- Streaks --------------------
async def streak(update: Update, context: ContextTypes.DEFAULT_TYPE):
set_user_name(update.effective_chat.id, update.effective_user.id, _display_name(update.effective_user))
st, best, last = get_streak(update.effective_chat.id, update.effective_user.id)
if st == 0:
await update.message.reply_text("No streak yet — answer all 5 `/daily` questions today to start a streak! 🔥")
else:
last_text = f" (last completed: {last})" if last else ""
await update.message.reply_text(f"🔥 *Streak*: {st} day(s) — *Best*: {best}{last_text}", parse_mode=ParseMode.MARKDOWN)
async def streakboard(update: Update, context: ContextTypes.DEFAULT_TYPE):
set_user_name(update.effective_chat.id, update.effective_user.id, _display_name(update.effective_user))
rows = get_top_streaks(update.effective_chat.id, limit=10)
if not rows:
await update.message.reply_text("No streaks yet in this chat. Be the first: complete `/daily` today!")
return
lines = ["🏆 *Top Streaks*"]
for i, (uid, st, best, name) in enumerate(rows, start=1):
lines.append(f"{i}. {name}: *{st}* (best {best})")
await update.message.reply_text("\n".join(lines), parse_mode=ParseMode.MARKDOWN)
# -------------------- Force commits --------------------
async def forcecommit(update: Update, context: ContextTypes.DEFAULT_TYPE):
set_user_name(update.effective_chat.id, update.effective_user.id, _display_name(update.effective_user))
n = 1
tag = str(update.effective_user.id)
if len(context.args) >= 1:
try:
n = max(1, int(context.args[0]))
except ValueError:
await update.message.reply_text("Usage: `/forcecommit [n] [tag]` (n must be an integer)", parse_mode=ParseMode.MARKDOWN)
return
if len(context.args) >= 2:
tag = context.args[1]
try:
from github_committer import make_daily_commits_if_configured
info = make_daily_commits_if_configured(n=n, tag=tag)
await update.message.reply_text(info or "No result returned.")
logger.info("forcecommit: %s", info)
except Exception as e:
logger.exception("forcecommit error", exc_info=e)
await update.message.reply_text(f"Commit failed: {e}")
# -------------------- Entrypoint --------------------
def main():
import argparse
parser = argparse.ArgumentParser(description="Contribution Graph Pop Quiz Bot")
parser.add_argument("--webhook", action="store_true", help="Force webhook mode (otherwise auto if RENDER url present)")
parser.add_argument("--base-url", default=os.environ.get("BASE_URL", ""), help="Public base URL override (for local testing)")
parser.add_argument("--path", default=os.environ.get("WEBHOOK_PATH", ""), help="Webhook path override")
parser.add_argument("--port", type=int, default=PORT, help="Port to listen on")
parser.add_argument("--listen", default=os.environ.get("LISTEN", "0.0.0.0"), help="Host to bind")
args = parser.parse_args()
token = os.environ.get("BOT_TOKEN")
if not token:
logger.error("Missing BOT_TOKEN. Set it in the environment or .env file.")
sys.exit(1)
# Ensure DB exists/migrated
init_db()
# Build PTB app
application = (
Application.builder()
.token(token)
.rate_limiter(AIORateLimiter())
.build()
)
# Commands
application.add_handler(CommandHandler("start", start))
application.add_handler(CommandHandler("help", help_cmd))
application.add_handler(CommandHandler("setuser", setuser))
application.add_handler(CommandHandler("quiz", quiz))
application.add_handler(CommandHandler("score", score))
application.add_handler(CommandHandler("daily", daily))
application.add_handler(CommandHandler("notify", notify))
application.add_handler(CommandHandler("when", when_cmd))
application.add_handler(CommandHandler("unnotify", unnotify))
application.add_handler(CommandHandler("streak", streak))
application.add_handler(CommandHandler("streakboard", streakboard))
application.add_handler(CommandHandler("forcecommit", forcecommit))
# Callbacks
application.add_handler(CallbackQueryHandler(cb_handler, pattern=r"^(opt:|next$)"))
application.add_handler(CallbackQueryHandler(cs_cb_handler, pattern=r"^cs:"))
# Rebuild daily jobs from DB so schedules persist across restarts/redeploys
_reschedule_all_jobs(application)
# Attach /healthz to PTB's aiohttp server (for webhook mode)
try:
application.web_app.add_routes([web.get("/healthz", _healthz_handler)])
except Exception as e:
logger.debug("Could not attach /healthz to PTB web_app yet: %s", e)
# Decide mode: webhook on Render, polling locally
base_url = RENDER_URL or args.base_url
path = args.path or WEBHOOK_PATH
port = args.port
listen = args.listen
if base_url or args.webhook:
# WEBHOOK MODE (Render or forced)
if not base_url:
logger.error("Webhook requested but no base URL provided.")
sys.exit(1)
webhook_url = f"{base_url}{path}"
logger.info("Starting in WEBHOOK mode")
logger.info("Public base URL: %s", base_url)
logger.info("Webhook path: %s", path)
logger.info("Setting webhook to: %s", webhook_url)
application.run_webhook(
listen=listen,
port=port,
url_path=path,
webhook_url=webhook_url,
# drop_pending_updates=True, # optional
)
else:
# POLLING MODE (local/dev). Start tiny keepalive server here only.
_start_keepalive()
logger.info("Starting in POLLING mode")
application.run_polling(close_loop=False)
if __name__ == "__main__":
main()