-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathforwarder.py
More file actions
884 lines (762 loc) · 33 KB
/
Copy pathforwarder.py
File metadata and controls
884 lines (762 loc) · 33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
# How to setup>>>
#
# 1. Rename environ.env to .env and update the values API_ID, API_HASH, and OWNER_ID.
#
#
import logging
import asyncio
import sys
import time
from collections import defaultdict
from datetime import datetime
from typing import Optional, Dict, Set, List
from pyrogram.errors import FloodWait
from pyrogram import Client, filters
from pyrogram.types import Message
from rich.console import Console
from bot.configs import Config
from bot.database import Database
from bot.helper import HelperClass
from bot.settings_manager import SettingsManager
from bot.utils import ForwardStats, MessageQueue, setup_logging
# Configure logging
setup_logging()
## --------------------
api_id = Config.API_ID
api_hash = Config.API_HASH
# Configuration constants
COPY_DELAY_SECONDS_M = 1 # Delay after each copy operation to prevent FloodWait
COPY_DELAY_SECONDS_MG = 2 # Delay for media groups
# Initialize components
console = Console()
db = Database("user_data") # Initialize with user_data directory
active_forwards: Dict[int, MessageQueue] = {} # Track active forward operations by user
forward_stats: Dict[int, ForwardStats] = {} # Track statistics by user
# retry_handler = RetryHandler()
reset_confirmations = {} # Track reset confirmations
# Initialize client with optimized settings
app = Client(
"my_account",
api_id=api_id,
api_hash=api_hash,
app_version="Forwarder v1.0",
device_model="Server",
system_version="1.0",
)
# Initialize handlers
conversation_handler = HelperClass(app, db)
settings_manager = SettingsManager(app, db)
# ------------- Command Handlers ------------- #
@app.on_message(
filters.command("start") & filters.user(Config.OWNER_ID) & filters.private
)
async def start_command(client: Client, message: Message):
"""Handle start command"""
start_text = """
Hi, welcome to the Best Forwarder Bot! 🤖\n
Send `/help` to see available commands.\n"""
await message.reply(start_text, disable_web_page_preview=True)
@app.on_message(
filters.command("help") & filters.user(Config.OWNER_ID) & filters.private
)
async def help_command(client: Client, message: Message):
"""Handle help command"""
help_text = """
Hello! 👋\n
I am your personal forwarder bot.\nI can help you forward files from one chat to another with ease.\n\n
Here are the available commands:\n
**/start:** Start the bot\n
**/set_ids (or /set):** source_chat_id target_chat_id - Set the source and target chat IDs for forwarding.\n
**/forward (or /f):** Start forwarding files from the source chat to the target chat.\n
**/stop:** Stop an ongoing forward operation\n
**/count (or /cnt):** Show chat history statistics with ETA for forwarding\n
**/settings (or /st):** View your current settings\n
**/stats:** View detailed statistics of the current/last forward operation\n
**/rs (or /reset):** Reset your settings and start over.\n
"""
await message.reply(help_text)
@app.on_message(
filters.command("stop") & filters.user(Config.OWNER_ID) & filters.private
)
async def stop_command(client: Client, message: Message):
"""Stop ongoing forward operation"""
user_id = message.from_user.id
if user_id in active_forwards:
queue = active_forwards[user_id]
queue.stop()
del active_forwards[user_id]
stats = forward_stats.get(user_id)
if stats:
await message.reply(
f"⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️\n\n"
f"** ✋ Forward process stopped by {user_id}.**\n\n{stats.format_progress()}\n"
f"⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️⚠️"
)
logging.info(f"Forward operation stopped by user {user_id}")
else:
await message.reply("❌ No active forward operation to stop.")
@app.on_message(
filters.command("stats") & filters.user(Config.OWNER_ID) & filters.private
)
async def stats_command(client: Client, message: Message):
"""Show forward operation statistics"""
user_id = message.from_user.id
stats = forward_stats.get(user_id)
if stats:
await message.reply(stats.format_progress())
else:
await message.reply(
"❌ No statistics available. Start a forward operation first."
)
@app.on_message(
filters.command(["count", "cnt"]) & filters.user(Config.OWNER_ID) & filters.private
)
async def count_command(client: Client, message: Message):
"""Show chat history count with ETA calculation"""
user_id = message.from_user.id
# Admin check (already handled by filter, but keeping for clarity)
if message.from_user.id != Config.OWNER_ID:
await message.reply("❌ You are not authorized to use this command.")
return
# Check if there's an active forward operation
if user_id in active_forwards:
await message.reply(
"⚠️ You have an active forward operation running. "
"Use /stop to stop it first, then check chat count."
)
return
try:
# Initialize database for user if needed
await db.init(user_id)
# Get user settings from database
settings = await db.get_user_settings(user_id)
if (
not settings
or not settings.get("source_chat")
or not settings.get("target_chat")
):
await message.reply(
"❌ **No settings found**\n\n"
"You haven't configured your forwarder yet.\n"
"Use `/set_ids source_chat target_chat` to get started.\n\n"
"**Example:**\n"
"`/set_ids @sourcechannel @targetchannel`\n"
"`/set_ids -1001234567890 -1009876543210`"
)
return
source_chat = settings.get("source_chat")
target_chat = settings.get("target_chat")
# Validate chat access
try:
chat_valid, faulty_ids = await conversation_handler.is_valid_chat(
(source_chat,)
)
if not chat_valid:
await message.reply(
f"❌ **Cannot access source chat**\n\n"
f"The source chat `{source_chat}` is not accessible.\n"
f"Please check:\n"
f"• Chat ID is correct\n"
f"• Bot has access to the chat\n"
f"• Chat still exists\n\n"
f"Use `/set_ids` to reconfigure your settings."
)
return
except Exception as chat_error:
logging.error(f"Error validating chat {source_chat}: {chat_error}")
await message.reply(
f"❌ **Error accessing chat**\n\n"
f"Could not validate access to source chat `{source_chat}`.\n"
f"Error: {str(chat_error)}\n\n"
f"Please try again later or use `/set_ids` to reconfigure."
)
return
# Send initial progress message
progress_msg = await message.reply(
"🔍 **Counting messages...**\n\nPlease wait while I analyze the chat history..."
)
# Initialize counters
total_messages = 0
media_groups_set = set() # Track unique media group IDs
valid_messages = 0
skipped_messages = 0
try:
# Count messages in chat history
async for msg in app.get_chat_history(source_chat):
total_messages += 1
# Skip invalid message types (same logic as forward command)
if any(
[msg.service, msg.empty, msg.command, msg.has_protected_content]
):
skipped_messages += 1
continue
# Count valid messages
valid_messages += 1
# Track media groups
if msg.media_group_id:
media_groups_set.add(msg.media_group_id)
# Update progress every 1000 messages
if total_messages % 1000 == 0:
await progress_msg.edit_text(
f"🔍 **Counting messages...**\n\n"
f"📊 Processed: `{total_messages:,}` messages\n"
f"✅ Valid: `{valid_messages:,}`\n"
f"⏭️ Skipped: `{skipped_messages:,}`\n"
f"📑 Media Groups: `{len(media_groups_set):,}`\n\n"
f"⏳ Please wait..."
)
except Exception as count_error:
logging.error(f"Error counting messages in {source_chat}: {count_error}")
await progress_msg.edit_text(
f"❌ **Error counting messages**\n\n"
f"An error occurred while counting messages in the source chat.\n"
f"Error: {str(count_error)}\n\n"
f"This might be due to:\n"
f"• Network connectivity issues\n"
f"• Chat access restrictions\n"
f"• Telegram API limits\n\n"
f"Please try again later."
)
return
# Calculate ETA based on message counts and delays
total_media_groups = len(media_groups_set)
single_messages = valid_messages - sum(
1 for _ in media_groups_set
) # Approximate
# Calculate estimated time with delays
# Each media group: COPY_DELAY_SECONDS_MG (2 seconds)
# Each single message: COPY_DELAY_SECONDS_M (1 second)
# Plus processing overhead per message (~0.1 seconds average)
estimated_time = (
(total_media_groups * COPY_DELAY_SECONDS_MG)
+ (single_messages * COPY_DELAY_SECONDS_M)
+ (valid_messages * 0.1) # Processing overhead
)
# Format time
def format_time(seconds: float) -> str:
"""Format time in H:M:S format."""
if seconds < 0:
return "0:00:00"
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
return f"{hours}H:{minutes:02d}M:{secs:02d}S"
# Get chat information for display
try:
chat_infos = await conversation_handler.get_chat_info(
(source_chat, target_chat)
)
if len(chat_infos) >= 2:
source_info = chat_infos[0]
target_info = chat_infos[1]
source_title = source_info.get("title", "Unknown")
target_title = target_info.get("title", "Unknown")
else:
source_title = "Unknown"
target_title = "Unknown"
except Exception:
source_title = "Unknown"
target_title = "Unknown"
# Format final response
count_text = (
f"📊 **Chat History Analysis**\n\n"
f"📥 **Source Chat:** `{source_title}`\n"
f"📤 **Target Chat:** `{target_title}`\n\n"
f"📈 **Message Statistics:**\n"
f"📋 **Total Messages:** `{total_messages:,}`\n"
f"✅ **Valid for Forward:** `{valid_messages:,}`\n"
f"⏭️ **Skipped (Invalid):** `{skipped_messages:,}`\n"
f"📑 **Media Groups:** `{total_media_groups:,}`\n\n"
f"⏱️ **Estimated Forward Time:**\n"
f"🕒 **ETA:** `{format_time(estimated_time)}`\n\n"
f"ℹ️ **Note:** ETA includes processing delays and rate limiting to prevent FloodWait errors.\n\n"
f"**Ready to forward?** Use `/forward` to start!"
)
await progress_msg.edit_text(count_text)
logging.info(
f"Count command completed for user {user_id}: {valid_messages:,} valid messages, {total_media_groups:,} media groups"
)
except Exception as e:
logging.error(f"Error in count command: {e}")
await message.reply(
"❌ **Count Error**\n\n"
f"An error occurred while analyzing the chat history.\n"
f"Error: {str(e)}\n\n"
"Please try again later or check your settings with `/settings`."
)
@app.on_message(
filters.command(["settings", "st"])
& filters.user(Config.OWNER_ID)
& filters.private
)
async def settings_command(client: Client, message: Message):
"""Display user settings in a formatted way"""
user_id = message.from_user.id
# Admin check (already handled by filter, but keeping for clarity)
if message.from_user.id != Config.OWNER_ID:
await message.reply("❌ You are not authorized to use this command.")
return
# Check if there's an active forward operation
if user_id in active_forwards:
await message.reply(
"⚠️ You have an active forward operation running. "
"Use /stop to stop it first, then check your settings."
)
return
try:
# Initialize database for user if needed
await db.init(user_id)
# Get user settings from database
settings = await db.get_user_settings(user_id)
if (
not settings
or not settings.get("source_chat")
or not settings.get("target_chat")
):
await message.reply(
"❌ **No settings found**\n\n"
"You haven't configured your forwarder yet.\n"
"Use `/set_ids source_chat target_chat` to get started.\n\n"
"**Example:**\n"
"`/set_ids @sourcechannel @targetchannel`\n"
"`/set_ids -1001234567890 -1009876543210`"
)
return
# Extract settings
source_chat = settings.get("source_chat")
target_chat = settings.get("target_chat")
updated_at = settings.get("updated_at", "Unknown")
# Get chat information
try:
chat_infos = await conversation_handler.get_chat_info(
(source_chat, target_chat)
)
if len(chat_infos) >= 2:
source_info = chat_infos[0]
target_info = chat_infos[1]
source_title = source_info.get("title", "Unknown")
target_title = target_info.get("title", "Unknown")
# Format the settings message
settings_text = (
"⚙️ **Your Current Settings**\n\n"
f"📥 **Source Chat:**\n"
f"`{source_chat}` - {source_title}\n\n"
f"📤 **Target Chat:**\n"
f"`{target_chat}` - {target_title}\n\n"
f"🕒 **Last Updated:** {updated_at}\n\n"
f"**Commands:**\n"
f"• Use `/forward` to start forwarding\n"
f"• Use `/reset` to change settings\n"
f"• Use `/stats` to view operation statistics"
)
else:
# Fallback if chat info retrieval fails
settings_text = (
"⚙️ **Your Current Settings**\n\n"
f"📥 **Source Chat:** `{source_chat}`\n"
f"📤 **Target Chat:** `{target_chat}`\n\n"
f"🕒 **Last Updated:** {updated_at}\n\n"
f"⚠️ *Could not retrieve chat details*\n\n"
f"**Commands:**\n"
f"• Use `/forward` to start forwarding\n"
f"• Use `/reset` to change settings\n"
f"• Use `/stats` to view operation statistics"
)
except Exception as chat_error:
logging.warning(f"Could not get chat info: {chat_error}")
# Fallback message if chat info fails
settings_text = (
"⚙️ **Your Current Settings**\n\n"
f"📥 **Source Chat:** `{source_chat}`\n"
f"📤 **Target Chat:** `{target_chat}`\n\n"
f"🕒 **Last Updated:** {updated_at}\n\n"
f"⚠️ *Could not retrieve chat details*\n\n"
f"**Commands:**\n"
f"• Use `/forward` to start forwarding\n"
f"• Use `/reset` to change settings\n"
f"• Use `/stats` to view operation statistics"
)
await message.reply(settings_text)
except Exception as e:
logging.error(f"Error in settings command: {e}")
await message.reply(
"❌ An error occurred while retrieving your settings. "
"Please try again later or use /reset to reconfigure."
)
@app.on_message(
filters.command(["reset", "rs"]) & filters.user(Config.OWNER_ID) & filters.private
)
async def reset_command(client: Client, message: Message):
"""Reset user settings and delete their data file"""
user_id = message.from_user.id
# Admin check (already handled by filter, but keeping for clarity)
if message.from_user.id != Config.OWNER_ID:
await message.reply("❌ You are not authorized to use this command.")
return
# Check if there's an active forward operation
if user_id in active_forwards:
await message.reply(
"⚠️ You have an active forward operation running. "
"Use /stop to stop it first, then try resetting your settings."
)
return
try:
# Initialize database for user if needed
await db.init(user_id)
# Check if user has any settings to reset
settings = await db.get_user_settings(user_id)
if not settings or (
not settings.get("source_chat") and not settings.get("target_chat")
):
await message.reply(
"❌ **No settings found to reset**\n\n"
"You don't have any configured settings yet.\n"
"Use `/set_ids source_chat target_chat` to get started."
)
return
# Notify user about reset with countdown
reset_msg = await message.reply(
"⚠️ **Database Reset Initiated**\n\n"
"Your settings will be permanently deleted in **5 seconds**.\n"
"This action cannot be undone.\n\n"
"💾 Current settings that will be lost:\n"
f"📥 Source: `{settings.get('source_chat', 'Not set')}`\n"
f"📤 Target: `{settings.get('target_chat', 'Not set')}`\n\n"
"⏳ Resetting in 5 seconds..."
)
# Wait for 5 seconds
await asyncio.sleep(5)
# Delete user settings
delete_success = await db.delete_user_settings(user_id)
if delete_success:
await reset_msg.edit_text(
"✅ **Settings Reset Complete**\n\n"
"Your database has been successfully reset.\n"
"All settings have been permanently deleted.\n\n"
"🚀 **Next Steps:**\n"
"• Use `/set_ids source_chat target_chat` to configure new settings\n"
"• Use `/help` to see all available commands\n\n"
"📋 A backup of your old settings has been created for safety."
)
else:
await reset_msg.edit_text(
"❌ **Reset Failed**\n\n"
"An error occurred while resetting your settings.\n"
"Your data may still be intact.\n\n"
"Please try again later or contact support if the issue persists."
)
except Exception as e:
err = logging.error(f"Error in reset command: {e}")
err
await message.reply(
"❌ **Reset Error**\n\n"
f"An error occurred while resetting your settings: {err}\n"
"Your settings may still be intact.\n\n"
"Please try again later or use `/settings` to check your current configuration."
)
@app.on_message(
filters.command(["set_ids", "set"])
& filters.user(Config.OWNER_ID)
& filters.private
)
async def set_ids_command(client: Client, message: Message):
"""Start the chat ID setup process"""
user_id = message.from_user.id
ids = message.text.strip().split(" ")
if message.from_user.id != Config.OWNER_ID:
await message.reply("❌ You are not authorized to use this command.")
return
await db.init(user_id) # Initialize database for the user
if not len(ids) == 3:
await message.reply(
"❌ Invalid command format.\nUse `/set_ids source_id target_id`\n"
"**SOURCE_CHAT:** The chat ID or username of the source chat.\n"
"**TARGET_CHAT:** The chat ID or username of the destination chat.\n"
)
return
source_chat_id = ids[1].strip()
target_chat_id = ids[2].strip()
if await db.get_user_settings(user_id):
await message.reply(
"⚠️ You already have a configuration.\nUse /settings to view or /reset to change it."
)
return
try:
# Validate chat IDs
valid_check, faulty_id = await conversation_handler.is_valid_chat(
(source_chat_id, target_chat_id)
)
if not valid_check:
await message.reply(
"❌ Invalid chat ID. Please make sure:\n"
"1. The chat ID/username is correct\n"
"2. You have access to the chat\n"
f"Invalid chat IDs: {', '.join(faulty_id)}"
)
return
else:
await message.reply("✅ Valid chat IDs. Proceeding to save settings...")
# Save settings
await db.set_user_chats(user_id, source_chat_id, target_chat_id)
source_info, target_info = await conversation_handler.get_chat_info(
(source_chat_id, target_chat_id)
)
await message.reply(
f"✅ Source and target chats set successfully:\n"
f"Source: `{source_chat_id}` {source_info.get('title', 'Unknown')} \n"
f"Target: `{target_chat_id}` {target_info.get('title', 'Unknown')}\n\n"
f"Use /st (or /settings) to see your settings.\nUse /f (or /forward) to start forwarding files.\n"
f"Use /rs (or /reset) to reset your settings.\n"
)
except Exception as e:
logging.error(f"Error in set_ids command: {e}")
await message.reply(
"❌ An error occurred while validating chat IDs. Please try again later."
)
# ------------- Forward Implementation ------------- #
class MediaGroupCollector:
"""Collects and manages media group messages."""
def __init__(self):
self.groups: Dict[str, List[Message]] = defaultdict(list)
self.message_ids: Set[int] = set()
def add_message(self, message: Message) -> bool:
"""Add a message to its media group."""
if not message.media_group_id:
return False
if message.id in self.message_ids:
return False
self.groups[message.media_group_id].append(message)
self.message_ids.add(message.id)
return True
def get_complete_groups(self) -> List[List[Message]]:
"""Get completed media groups (groups with all messages received)."""
complete = []
for group_id, messages in list(self.groups.items()):
if len(messages) >= 2: # Media groups have at least 2 messages
complete.append(sorted(messages, key=lambda m: m.id))
del self.groups[group_id]
return complete
async def forward_media_group(
client: Client,
media_group: List[Message],
target_chat: str,
source_chat: str | int = None,
) -> bool:
"""Forward a media group without retries."""
try:
await client.copy_media_group(
chat_id=target_chat,
from_chat_id=source_chat,
message_id=media_group[0].id,
)
# Add delay after successful copy operation to prevent FloodWait
# await asyncio.sleep(len(media_group) * COPY_DELAY_SECONDS_MG)
return True
except Exception as e:
logging.error(f"Error forwarding media group {media_group[0].id}: {e}")
return False
async def forward_message(client: Client, message: Message, target_chat: str) -> bool:
"""Forward a single message without retries."""
try:
await client.copy_message(
chat_id=target_chat,
from_chat_id=message.chat.id,
message_id=message.id,
disable_notification=True,
)
# Add delay after successful copy operation to prevent FloodWait
# await asyncio.sleep(COPY_DELAY_SECONDS_M)
return True
except Exception as e:
logging.error(f"Error forwarding {message.id}: {e}")
return False
async def update_progress(
message: Message, stats: ForwardStats, update_interval: int = 5
) -> None:
"""Update progress message periodically."""
last_update = 0
while True:
current_time = time.time()
if current_time - last_update >= update_interval:
await message.edit_text(stats.format_progress())
last_update = current_time
await asyncio.sleep(1)
@app.on_message(
filters.command(["f", "forward", "fwd"])
& filters.user(Config.OWNER_ID)
& filters.private
)
async def forward_command(client: Client, message: Message):
"""Handle forward command with improved error handling and progress tracking."""
user_id = message.from_user.id
# Check if forward already in progress
if user_id in active_forwards:
await message.reply(
"⚠️ You already have an active forward operation. Use /stop first."
)
return
# Initialize components
media_collector = MediaGroupCollector()
stats = ForwardStats()
forward_stats[user_id] = stats
# Get user settings
settings = await db.get_user_settings(user_id)
if not settings:
await message.reply(
"❌ No settings found. Use /set_ids to configure your forward settings first."
)
return
source_chat = settings["source_chat"]
target_chat = settings["target_chat"]
progress_msg = await message.reply("⏳ Starting forward operation...")
logging.info(
f"Starting forward operation from {source_chat} to {target_chat} for user {user_id}"
)
# progress_percentage = 0
try:
# Initialize message queue
queue = MessageQueue()
active_forwards[user_id] = queue
# Start progress updates
task_scheduler = asyncio.create_task(update_progress(progress_msg, stats))
await progress_msg.edit_text("📥 Collecting messages in chronological order...")
all_messages = []
batch_size = 100 # Process in batches to manage memory
total_collected = 0
# First pass: collect all messages to reverse the order
async for msg in app.get_chat_history(source_chat):
if not queue._active and not user_id in active_forwards:
logging.info("Forward operation stopped by user.")
break
# Skip invalid message types during collection
if any([msg.service, msg.empty, msg.command, msg.has_protected_content]):
stats.skipped += 1
continue
all_messages.append(msg)
total_collected += 1
# Reverse the entire list to get chronological order (oldest first)
all_messages.reverse()
stats.total = len(all_messages)
if stats.total == 0:
await progress_msg.edit_text(
"❌ No valid messages found to forward.\n"
"All messages were either service messages, empty, commands, or protected content."
)
return
# Second pass: forward messages in correct chronological order
for i, msg in enumerate(all_messages):
if not queue._active:
break
# Handle media groups
if msg.media_group_id:
if media_collector.add_message(msg):
complete_groups = media_collector.get_complete_groups()
for group in complete_groups:
# check if user_id is in active forwards and stop the program otherwise and alert the user on telegram
if not queue._active and user_id not in active_forwards:
await progress_msg.edit_text(
f"Forwarding stopped by user {user_id}\n"
f"{stats.format_progress()}"
)
logging.info(f"Forward operation stopped by user {user_id}")
break
try:
if await forward_media_group(
client, group, target_chat, source_chat
):
stats.processed += len(group)
stats.media_groups += 1
logging.info(
f"Forwarded media group of {len(group)} messages."
)
await asyncio.sleep(
COPY_DELAY_SECONDS_MG
) # Delay for media group forwarding
else:
stats.failed += len(group)
continue
except FloodWait as e:
logging.warning(
f"FloodWait encountered while processing media group: sleeping for {e.value + 1} seconds"
)
await asyncio.sleep(e.value + 1)
continue
except Exception as e:
logging.error(f"Error processing media group: {e}")
stats.failed += 1
continue
else:
try:
# Handle single messages
if not queue._active and not user_id in active_forwards:
await progress_msg.edit_text(
f"Forwarding stopped by user {user_id}\n{stats.format_progress()}"
)
logging.info(f"Forward operation stopped by user {user_id}")
# await asyncio.sleep(0.7)
break
if await forward_message(client, msg, target_chat):
stats.processed += 1
logging.info(f"Forwarded message {msg.id} from {source_chat}.")
await asyncio.sleep(
COPY_DELAY_SECONDS_M
) # Delay for single message forwarding
else:
stats.failed += 1
continue
except FloodWait as e:
logging.warning(
f"FloodWait encountered while processing message {msg.id}: sleeping for {e.value + 1} seconds"
)
await asyncio.sleep(e.value + 1)
continue
except Exception as e:
logging.error(f"Error processing message {msg.id}: {e}")
stats.failed += 1
continue
# Update progress every 10 messages or every 5% of progress
progress_interval = max(
10, stats.total // 20
) # Update at least every 5% of progress
if (i + 1) % progress_interval == 0:
stats.percentage = (
(i + 1) / (stats.total)
) * 100 # Calculate percentage
await progress_msg.edit_text(f"{stats.format_progress()}")
# Final status
await progress_msg.edit_text(
f"**✅ Forward operation completed!**\n\n{stats.format_progress()}"
)
logging.info(f"Forward operation completed for user {user_id}: ")
# Cleanup
if user_id in active_forwards or queue._active:
task_scheduler.cancel() # Cancel the progress update task
# Stop the queue and remove from active forwards
queue.stop()
del active_forwards[user_id]
except Exception as e:
logging.error(f"Error in forward operation: {e}")
await progress_msg.edit_text(
"❌ An error occurred during the forward operation.\n"
"Please check the logs for details."
)
except FloodWait as e:
logging.warning(
f"FloodWait encountered during forward operation: sleeping for {e.value + 1} seconds"
)
await asyncio.sleep(e.value + 1)
await progress_msg.edit_text(
f"⚠️ FloodWait encountered. Please wait for {e.value + 1} seconds before retrying the operation."
)
# ------------- Utility Functions ------------- #
async def check_owner_id(app: Client, message: Message) -> bool:
"""Check if Owner ID is set in the environment variables."""
if not Config.OWNER_ID or len(str(Config.OWNER_ID)) == 0 or Config.OWNER_ID == 0:
await message.reply(
"❌ Owner ID is not set in the environment variable (.env). "
"Please set it and restart the bot."
)
return False
return True
if __name__ == "__main__":
app.run()