197 lines
7.5 KiB
Python
197 lines
7.5 KiB
Python
import asyncio
|
||
import os
|
||
import random
|
||
import shutil
|
||
import time
|
||
from asyncio import AbstractEventLoop
|
||
from time import sleep
|
||
|
||
from ethon.pyfunc import video_metadata
|
||
from pyrogram import Client, filters
|
||
from pyrogram.errors import FloodWait
|
||
from pyrogram.methods.utilities import idle
|
||
from pyrogram.raw.functions.channels import GetMessages
|
||
from pyrogram.types import Chat, Message
|
||
|
||
from config import APP_HASH, APP_ID
|
||
from helpers import screenshot
|
||
from progress import progress_for_pyrogram
|
||
|
||
|
||
class Telegram:
|
||
def __init__(self, loop):
|
||
self.client = None
|
||
|
||
self.event_loop: AbstractEventLoop = loop
|
||
# self.autodeleting = None
|
||
|
||
def thumbnail(sender):
|
||
if os.path.exists(f"{sender}.jpg"):
|
||
return f'{sender}.jpg'
|
||
return None
|
||
|
||
async def create_session(self):
|
||
instance = self
|
||
self.client = Client("my_account", APP_ID, APP_HASH)
|
||
|
||
@self.client.on_message(filters.command("echo", prefixes=".") & filters.private)
|
||
async def echo(self: Client, message: Message):
|
||
while 1:
|
||
try:
|
||
await message.reply(message.text.split('.echo ', maxsplit=1)[1])
|
||
return
|
||
except FloodWait as e:
|
||
await asyncio.sleep(e.x)
|
||
|
||
@self.client.on_message(filters.command("type", prefixes=".") & filters.private)
|
||
async def hello(self: Client, message: Message):
|
||
msg = await self.send_message(message.chat.id, message.text.split('.type ', maxsplit=1)[1])
|
||
|
||
orig_text = message.text.split(".type ", maxsplit=1)[1]
|
||
text = orig_text
|
||
tbp = ""
|
||
typing_symbol = "█"
|
||
|
||
while tbp != orig_text:
|
||
try:
|
||
await msg.edit(tbp+typing_symbol)
|
||
sleep(0.05)
|
||
|
||
tbp += text[0]
|
||
text = text[1:]
|
||
|
||
await msg.edit(tbp)
|
||
sleep(0.05)
|
||
except FloodWait as e:
|
||
sleep(e.x)
|
||
|
||
@self.client.on_message(filters.command("test", prefixes=".") & filters.me)
|
||
async def test(self: Client, msg: Message):
|
||
edit_msg = await msg.reply("Processing!")
|
||
msg_text = msg.text.split(".test ", maxsplit=1)[1]
|
||
msg_id = int(msg_text.split("/")[-1])
|
||
if 't.me/c/' in msg_text:
|
||
try:
|
||
chat = int('-100'+str(msg_text.split("/")[-2]))
|
||
mes = await self.get_messages(chat, msg_id)
|
||
print("Rabotaet")
|
||
edit = await self.edit_message_text(msg.chat.id, edit_msg.id, "Trying to download!")
|
||
file = await self.download_media(
|
||
mes,
|
||
progress=progress_for_pyrogram,
|
||
progress_args=(
|
||
self,
|
||
"**DOWNLOADING:**\n",
|
||
edit,
|
||
time.time()
|
||
)
|
||
)
|
||
caption = mes.caption
|
||
# await self.send_message("me", mes.caption,entities=mes.caption_entities,disable_web_page_preview=True)
|
||
# await self.send_photo("me", file, caption=mes.caption)
|
||
await edit.edit('Preparing to upload!')
|
||
if str(file).split(".")[-1] in ['mkv', 'mp4', 'webm']:
|
||
if str(file).split(".")[-1] in ['webm', 'mkv']:
|
||
path = str(file).split(".")[0] + ".mp4"
|
||
os.rename(file, path)
|
||
file = str(file).split(".")[0] + ".mp4"
|
||
data = video_metadata(file)
|
||
duration = data['duration']
|
||
thumb_path = await screenshot(file, duration, msg.from_user.id)
|
||
await self.send_video(
|
||
msg.chat.id,
|
||
video=file,
|
||
caption=caption,
|
||
thumb=thumb_path,
|
||
duration=duration,
|
||
caption_entities=mes.caption_entities,
|
||
progress=progress_for_pyrogram,
|
||
has_spoiler=True,
|
||
progress_args=(
|
||
self,
|
||
"**UPLOADING:**\n",
|
||
edit,
|
||
time.time()
|
||
)
|
||
)
|
||
elif str(file).split(".")[-1] in ['png', 'jpeg', 'jpg', 'webp']:
|
||
await self.send_photo(msg.chat.id, photo=file, caption=caption, caption_entities=mes.caption_entities)
|
||
await edit.delete()
|
||
if os.path.exists(str(file)):
|
||
os.remove(str(file))
|
||
except ValueError:
|
||
print("Plohaya ssylka")
|
||
except Exception as e:
|
||
print(e)
|
||
print("Tebya net v kanale, loh")
|
||
else:
|
||
chat = str(msg_text.split("/")[-2])
|
||
edit = await self.edit_message_text(msg.chat.id, edit_msg.id, "Cloning...")
|
||
try:
|
||
await self.copy_message(msg.chat.id, chat, msg_id)
|
||
except Exception as e:
|
||
print(e)
|
||
return await self.edit_message_text(msg.chat.id, edit_msg.id, f"Failed to clone: {msg_text}")
|
||
await edit_msg.delete()
|
||
|
||
@self.client.on_message(filters.command("hack", prefixes=".") & filters.me)
|
||
def hack(_, msg):
|
||
perc = 0
|
||
|
||
while (perc < 100):
|
||
try:
|
||
text = "👮 Взлом пентагона в процессе ..." + \
|
||
str(perc) + "%"
|
||
msg.edit(text)
|
||
|
||
perc += random.randint(1, 3)
|
||
sleep(0.1)
|
||
|
||
except FloodWait as e:
|
||
sleep(e.x)
|
||
|
||
msg.edit("🟢 Пентагон успешно взломан!")
|
||
sleep(3)
|
||
|
||
msg.edit("👽 Поиск секретных данных об НЛО ...")
|
||
perc = 0
|
||
|
||
while (perc < 100):
|
||
try:
|
||
text = "👽 Поиск секретных данных об НЛО ..." + \
|
||
str(perc) + "%"
|
||
msg.edit(text)
|
||
|
||
perc += random.randint(1, 5)
|
||
sleep(0.15)
|
||
|
||
except FloodWait as e:
|
||
sleep(e.x)
|
||
|
||
msg.edit("🦖 Найдены данные о существовании динозавров на земле!")
|
||
|
||
@self.client.on_disconnect()
|
||
async def disconnect_handler(self, message=None):
|
||
while not self.is_connected:
|
||
await asyncio.sleep(10)
|
||
|
||
if self.is_connected:
|
||
print("Connected again!")
|
||
break
|
||
try:
|
||
await self.connect()
|
||
except:
|
||
pass
|
||
|
||
def start_session(self):
|
||
if self.client:
|
||
# print("Bot Started!")
|
||
self.client.run()
|
||
print("Bot shutdown!")
|
||
if os.path.exists("./downloads"):
|
||
shutil.rmtree("./downloads")
|
||
if os.path.exists("./downloads"):
|
||
shutil.rmtree("./downloads")
|
||
if os.path.exists("./downloads"):
|
||
shutil.rmtree("./downloads")
|