Files
TgRestrictedRepostBot/controllers.py
2025-07-28 18:16:04 +03:00

197 lines
7.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import os
import random
import shutil
import time
from asyncio import AbstractEventLoop
from time import sleep
from ethon.pyfunc import video_metadata
from pyrogram import Client, filters
from pyrogram.errors import FloodWait
from pyrogram.methods.utilities import idle
from pyrogram.raw.functions.channels import GetMessages
from pyrogram.types import Chat, Message
from config import APP_HASH, APP_ID
from helpers import screenshot
from progress import progress_for_pyrogram
class Telegram:
def __init__(self, loop):
self.client = None
self.event_loop: AbstractEventLoop = loop
# self.autodeleting = None
def thumbnail(sender):
if os.path.exists(f"{sender}.jpg"):
return f'{sender}.jpg'
return None
async def create_session(self):
instance = self
self.client = Client("my_account", APP_ID, APP_HASH)
@self.client.on_message(filters.command("echo", prefixes=".") & filters.private)
async def echo(self: Client, message: Message):
while 1:
try:
await message.reply(message.text.split('.echo ', maxsplit=1)[1])
return
except FloodWait as e:
await asyncio.sleep(e.x)
@self.client.on_message(filters.command("type", prefixes=".") & filters.private)
async def hello(self: Client, message: Message):
msg = await self.send_message(message.chat.id, message.text.split('.type ', maxsplit=1)[1])
orig_text = message.text.split(".type ", maxsplit=1)[1]
text = orig_text
tbp = ""
typing_symbol = ""
while tbp != orig_text:
try:
await msg.edit(tbp+typing_symbol)
sleep(0.05)
tbp += text[0]
text = text[1:]
await msg.edit(tbp)
sleep(0.05)
except FloodWait as e:
sleep(e.x)
@self.client.on_message(filters.command("test", prefixes=".") & filters.me)
async def test(self: Client, msg: Message):
edit_msg = await msg.reply("Processing!")
msg_text = msg.text.split(".test ", maxsplit=1)[1]
msg_id = int(msg_text.split("/")[-1])
if 't.me/c/' in msg_text:
try:
chat = int('-100'+str(msg_text.split("/")[-2]))
mes = await self.get_messages(chat, msg_id)
print("Rabotaet")
edit = await self.edit_message_text(msg.chat.id, edit_msg.id, "Trying to download!")
file = await self.download_media(
mes,
progress=progress_for_pyrogram,
progress_args=(
self,
"**DOWNLOADING:**\n",
edit,
time.time()
)
)
caption = mes.caption
# await self.send_message("me", mes.caption,entities=mes.caption_entities,disable_web_page_preview=True)
# await self.send_photo("me", file, caption=mes.caption)
await edit.edit('Preparing to upload!')
if str(file).split(".")[-1] in ['mkv', 'mp4', 'webm']:
if str(file).split(".")[-1] in ['webm', 'mkv']:
path = str(file).split(".")[0] + ".mp4"
os.rename(file, path)
file = str(file).split(".")[0] + ".mp4"
data = video_metadata(file)
duration = data['duration']
thumb_path = await screenshot(file, duration, msg.from_user.id)
await self.send_video(
msg.chat.id,
video=file,
caption=caption,
thumb=thumb_path,
duration=duration,
caption_entities=mes.caption_entities,
progress=progress_for_pyrogram,
has_spoiler=True,
progress_args=(
self,
"**UPLOADING:**\n",
edit,
time.time()
)
)
elif str(file).split(".")[-1] in ['png', 'jpeg', 'jpg', 'webp']:
await self.send_photo(msg.chat.id, photo=file, caption=caption, caption_entities=mes.caption_entities)
await edit.delete()
if os.path.exists(str(file)):
os.remove(str(file))
except ValueError:
print("Plohaya ssylka")
except Exception as e:
print(e)
print("Tebya net v kanale, loh")
else:
chat = str(msg_text.split("/")[-2])
edit = await self.edit_message_text(msg.chat.id, edit_msg.id, "Cloning...")
try:
await self.copy_message(msg.chat.id, chat, msg_id)
except Exception as e:
print(e)
return await self.edit_message_text(msg.chat.id, edit_msg.id, f"Failed to clone: {msg_text}")
await edit_msg.delete()
@self.client.on_message(filters.command("hack", prefixes=".") & filters.me)
def hack(_, msg):
perc = 0
while (perc < 100):
try:
text = "👮‍ Взлом пентагона в процессе ..." + \
str(perc) + "%"
msg.edit(text)
perc += random.randint(1, 3)
sleep(0.1)
except FloodWait as e:
sleep(e.x)
msg.edit("🟢 Пентагон успешно взломан!")
sleep(3)
msg.edit("👽 Поиск секретных данных об НЛО ...")
perc = 0
while (perc < 100):
try:
text = "👽 Поиск секретных данных об НЛО ..." + \
str(perc) + "%"
msg.edit(text)
perc += random.randint(1, 5)
sleep(0.15)
except FloodWait as e:
sleep(e.x)
msg.edit("🦖 Найдены данные о существовании динозавров на земле!")
@self.client.on_disconnect()
async def disconnect_handler(self, message=None):
while not self.is_connected:
await asyncio.sleep(10)
if self.is_connected:
print("Connected again!")
break
try:
await self.connect()
except:
pass
def start_session(self):
if self.client:
# print("Bot Started!")
self.client.run()
print("Bot shutdown!")
if os.path.exists("./downloads"):
shutil.rmtree("./downloads")
if os.path.exists("./downloads"):
shutil.rmtree("./downloads")
if os.path.exists("./downloads"):
shutil.rmtree("./downloads")