Files
nwxraybot/nwxraybot/handlers/admin.py
Sergey Elpashev 926774424e
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
Feat: added notifier
2025-01-17 13:19:13 +03:00

104 lines
5.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import logging
import re
from datetime import datetime
from typing import Optional
from aiogram import Bot, F
from aiogram.enums import ParseMode
from aiogram.filters import Command
from aiogram.fsm.context import FSMContext
from aiogram.types import (CallbackQuery, InlineKeyboardButton,
InlineKeyboardMarkup, Message)
from nwxraybot import get_code
from nwxraybot.fsm import BroadcastStates
from nwxraybot.meta import Handler
from nwxraybot.middlewares import AdminMiddleware
from nwxraybot.models import User
class AdminHandler(Handler):
def __init__(self, bot: Optional[Bot] = None) -> None:
super().__init__(bot)
self.router.message.middleware(AdminMiddleware())
@self.router.message(Command('adduser'))
async def add_user(message: Message):
mask = r"^(?P<name>[a-zA-Z0-9]+)\s(?P<url>vless://[^\s]+)($|\s(?P<date>[0-9]{2}\.[0-9]{2}\.[0-9]{4})\s(?P<time>[0-9]{2}\:[0-9]{2})$)"
text = message.text.replace('/adduser ', '')
match = re.match(mask, text)
if match is None:
await message.reply('Вы ввели команду в неверном формате. Вводите в формате:\n``` /adduser name vless://.... 01.01.1970 00:00```', parse_mode=ParseMode.MARKDOWN)
return
user_dict = match.groupdict()
date = None
if user_dict['date']:
date = datetime.strptime(f"{user_dict['date']} {
user_dict['time']}", "%d.%m.%Y %H:%M")
code = get_code()
new_user = User(
name=user_dict['name'], url=user_dict['url'], time=date, code=code)
new_user.save()
await message.answer(f'Пользователь создан. Вот его ссылка для доступа:\n`https://t.me/nwproxybot?start={code}`', parse_mode=ParseMode.MARKDOWN)
@self.router.message(Command('updateuser'))
async def update_user(message: Message):
mask = r"^(?P<name>[a-zA-Z0-9]+)\s(?P<date>[0-9]{2}\.[0-9]{2}\.[0-9]{4})\s(?P<time>[0-9]{2}\:[0-9]{2})$"
text = message.text.replace('/updateuser ', '')
match = re.match(mask, text)
if match is None:
await message.reply('Вы ввели команду в неверном формате. Вводите в формате:\n``` /updateuser name 01.01.1970 00:00```', parse_mode=ParseMode.MARKDOWN)
return
user_dict = match.groupdict()
date = datetime.strptime(f"{user_dict['date']} {
user_dict['time']}", "%d.%m.%Y %H:%M")
query = User.update(time=date).where(
User.name == user_dict['name'])
query.execute()
await message.answer('Информация о пользователе обновлена.')
@self.router.message(Command('broadcast'))
async def start_broadcast(message: Message, state: FSMContext):
await message.answer('Отправьте для рассылки')
await state.set_state(BroadcastStates.waiting_for_message)
@self.router.message(BroadcastStates.waiting_for_message)
async def broadcast_message(message: Message, state: FSMContext):
if message.media_group_id is not None:
await state.clear()
await message.answer('Пожалуйста, не отправляйте сообщение с несколькими изображениями.')
return
await state.set_state(BroadcastStates.confirming_message)
keyboard = InlineKeyboardMarkup(inline_keyboard=[
[InlineKeyboardButton(
text="Подтвердить", callback_data='confirm_broadcast')],
[InlineKeyboardButton(
text="Отменить", callback_data='cancel_broadcast')]
])
await state.update_data(message_id=message.message_id)
await message.answer("Подтвердите отправку сообщения", reply_markup=keyboard)
@self.router.callback_query(F.data == 'confirm_broadcast', BroadcastStates.confirming_message)
async def confirm_broadcast(callback: CallbackQuery, state: FSMContext):
await callback.answer()
data = await state.get_data()
message_id = data.get('message_id')
users = User.select().where((User.telegram_id != None))
semaphore = asyncio.Semaphore(10)
for user in users:
async with semaphore:
try:
await self.bot.copy_message(user.telegram_id, callback.from_user.id, message_id)
except Exception as e:
logging.error(f"Error while broadcasting message to user {
user.telegram_id}: {e}")
await state.clear()
@self.router.callback_query(F.data == 'cancel_broadcast', BroadcastStates.confirming_message)
async def cancel_broadcast(callback: CallbackQuery, state: FSMContext):
await callback.answer()
await state.clear()
await callback.message.answer('Рассылка отменена.')