edited: methods for new api types

This commit is contained in:
2023-11-26 02:24:35 +05:00
parent dbf8838183
commit 137d3d6e79
4 changed files with 55 additions and 45 deletions

View File

@@ -1,4 +1,4 @@
from typing import Any from typing import Any, List
from aiogram import Bot, F, Router, types from aiogram import Bot, F, Router, types
from aiogram.filters import Command from aiogram.filters import Command
@@ -11,14 +11,15 @@ from handlers.filters.reply_to_user import ReplyToUser
from handlers.middlewares.user import AdminMiddleware from handlers.middlewares.user import AdminMiddleware
from handlers.states.change_post import ChangePost from handlers.states.change_post import ChangePost
from neuroapi import neuroapi from neuroapi import neuroapi
import neuroapi.types as nat
def get_post_info(post: dict, post_id: int) -> str: def get_post_info(post: nat.Post, post_id: int) -> str:
text = post["text"] text = post.text
time = post["timestamp"] time = post.timestamp
from_user = post["from_user_id"] from_user = post.from_user_id
s = f"""Индекс: {post_id}\nТекст: {text}\nВремя отправки: {time}\nОт: [id{from_user}](tg://user?id={from_user})""".replace('#', '\#').replace( s = f"""Индекс: {post_id}\nТекст: {text}\nВремя отправки: {time}\nОт: [id{from_user}](tg://user?id={from_user})""".replace('#', '\#').replace(
"_", "\_").replace('.', '\.').replace(',', '\,').replace('!', '\!').replace('-', '\-').replace(':', '\:') "_", "\_").replace('.', '\.').replace(',', '\,').replace('!', '\!').replace('-', '\-').replace(':', '\:').replace('+', '\+')
return s return s
@@ -33,18 +34,18 @@ class Admin_commands:
@self.router.message(NewPostFilter()) @self.router.message(NewPostFilter())
async def new_post(message: types.Message): async def new_post(message: types.Message):
post = await neuroapi.post.get_by_media_group_id(message.media_group_id) post: nat.Post = await neuroapi.post.get_by_media_group_id(message.media_group_id)
await neuroapi.image.add(post['uuid'], message.photo[-1].file_id, message.has_media_spoiler, message.message_id) await neuroapi.image.add(str(post.uuid), message.photo[-1].file_id, message.has_media_spoiler, message.message_id)
@self.router.message(Command('info')) @self.router.message(Command('info'))
async def info_command(message: types.Message): async def info_command(message: types.Message):
posts = await neuroapi.post.get_will_post() posts: List[nat.Post] = await neuroapi.post.get_will_post()
post_c = {} post_c = {}
for post in posts: for post in posts:
if post['from_user_id'] not in post_c: if post.from_user_id not in post_c:
post_c[post['from_user_id']] = 1 post_c[post.from_user_id] = 1
else: else:
post_c[post['from_user_id']] += 1 post_c[post.from_user_id] += 1
await message.answer(str(post_c)) await message.answer(str(post_c))
@self.router.message(ChangePosts()) @self.router.message(ChangePosts())
@@ -64,12 +65,13 @@ class Admin_commands:
text='Отмена', callback_data='cancel')] text='Отмена', callback_data='cancel')]
] ]
keyboard = types.InlineKeyboardMarkup(inline_keyboard=kb) keyboard = types.InlineKeyboardMarkup(inline_keyboard=kb)
post = await neuroapi.post.get(posts[0]['uuid']) post = await neuroapi.post.get(str(posts[0].uuid))
images = MediaGroupBuilder( images = MediaGroupBuilder(
caption=get_post_info(post, 1)) caption=get_post_info(post, 1))
for image in sorted(post['images'], key=lambda x: x['message_id']): image: nat.Image
images.add_photo(image['file_id'], for image in sorted(post.images, key=lambda x: x.message_id):
has_spoiler=image['has_spoiler'], parse_mode='markdownv2') images.add_photo(image.file_id,
has_spoiler=image.has_spoiler, parse_mode='markdownv2')
mes = await message.answer_media_group(images.build()) mes = await message.answer_media_group(images.build())
await state.update_data(edit_msg=mes[0].message_id) await state.update_data(edit_msg=mes[0].message_id)
await message.answer('Действия', reply_markup=keyboard) await message.answer('Действия', reply_markup=keyboard)
@@ -84,7 +86,7 @@ class Admin_commands:
await callback.answer() await callback.answer()
await callback.message.delete() await callback.message.delete()
return return
posts = data['posts'] posts: List[nat.Post] = data['posts']
post_id = data['id']+1 post_id = data['id']+1
select_btns = [types.InlineKeyboardButton( select_btns = [types.InlineKeyboardButton(
text='<-', callback_data='prev_post')] text='<-', callback_data='prev_post')]
@@ -100,7 +102,7 @@ class Admin_commands:
] ]
keyboard = types.InlineKeyboardMarkup(inline_keyboard=kb) keyboard = types.InlineKeyboardMarkup(inline_keyboard=kb)
await state.update_data(id=post_id) await state.update_data(id=post_id)
post = await neuroapi.post.get(posts[post_id]['uuid']) post = await neuroapi.post.get(str(posts[post_id].uuid))
await bot.edit_message_caption(caption=get_post_info(post, post_id+1), chat_id=callback.message.chat.id, message_id=data['edit_msg'], parse_mode='markdownv2') await bot.edit_message_caption(caption=get_post_info(post, post_id+1), chat_id=callback.message.chat.id, message_id=data['edit_msg'], parse_mode='markdownv2')
await callback.message.edit_reply_markup(reply_markup=keyboard) await callback.message.edit_reply_markup(reply_markup=keyboard)
await callback.answer() await callback.answer()
@@ -129,9 +131,9 @@ class Admin_commands:
if 'posts' not in data: if 'posts' not in data:
await state.clear() await state.clear()
return return
posts = data['posts'] posts: List[nat.Post] = data['posts']
post_id = data['id'] post_id = data['id']
post_uuid = posts[post_id]['uuid'] post_uuid = str(posts[post_id].uuid)
try: try:
await neuroapi.post.edit_text(post_uuid, message.text) await neuroapi.post.edit_text(post_uuid, message.text)
await message.answer(f'Текст поста изменен на: {message.text}') await message.answer(f'Текст поста изменен на: {message.text}')
@@ -147,7 +149,7 @@ class Admin_commands:
await callback.answer() await callback.answer()
await callback.message.delete() await callback.message.delete()
return return
posts = data['posts'] posts: List[nat.Post] = data['posts']
post_id = data['id']-1 post_id = data['id']-1
select_btns = [types.InlineKeyboardButton( select_btns = [types.InlineKeyboardButton(
text='->', callback_data='next_post')] text='->', callback_data='next_post')]
@@ -163,7 +165,7 @@ class Admin_commands:
] ]
keyboard = types.InlineKeyboardMarkup(inline_keyboard=kb) keyboard = types.InlineKeyboardMarkup(inline_keyboard=kb)
await state.update_data(id=post_id) await state.update_data(id=post_id)
post = await neuroapi.post.get(posts[post_id]['uuid']) post = await neuroapi.post.get(str(posts[post_id].uuid))
await bot.edit_message_caption(caption=get_post_info(post, post_id), chat_id=callback.message.chat.id, message_id=data['edit_msg'], parse_mode='markdownv2') await bot.edit_message_caption(caption=get_post_info(post, post_id), chat_id=callback.message.chat.id, message_id=data['edit_msg'], parse_mode='markdownv2')
await callback.message.edit_reply_markup(reply_markup=keyboard) await callback.message.edit_reply_markup(reply_markup=keyboard)
await callback.answer() await callback.answer()
@@ -181,19 +183,20 @@ class Admin_commands:
async def post(message: types.Message): async def post(message: types.Message):
posts = await neuroapi.post.get_will_post() posts = await neuroapi.post.get_will_post()
if (posts): if (posts):
post = await neuroapi.post.get(posts[0]['uuid']) post = await neuroapi.post.get(str(posts[0].uuid))
images = MediaGroupBuilder(caption=post['text']) images = MediaGroupBuilder(caption=post.text)
for image in sorted(post['images'], key=lambda x: x['message_id']): image: nat.Image
images.add_photo(image['file_id'], for image in sorted(post.images, key=lambda x: x.message_id):
has_spoiler=image['has_spoiler']) images.add_photo(image.file_id,
has_spoiler=image.has_spoiler)
await message.answer_media_group(images.build()) await message.answer_media_group(images.build())
else: else:
await message.answer('Нет постов') await message.answer('Нет постов')
@self.router.message(NewSoloPostFilter()) @self.router.message(NewSoloPostFilter())
async def post_solo(message: types.Message): async def post_solo(message: types.Message):
post = await neuroapi.post.new(message.caption.replace('/newpost ', ''), message.from_user.id) post: nat.Post = await neuroapi.post.new(message.caption.replace('/newpost ', ''), message.from_user.id)
await neuroapi.image.add(post['uuid'], message.photo[-1].file_id, message.has_media_spoiler, message.message_id) await neuroapi.image.add(str(post.uuid), message.photo[-1].file_id, message.has_media_spoiler, message.message_id)
await message.answer('Пост успешно добавлен!') await message.answer('Пост успешно добавлен!')
@self.router.message(ReplyToUser()) @self.router.message(ReplyToUser())

View File

@@ -1,9 +1,11 @@
from typing import Any from typing import Any, List
from aiogram import Bot, F, Router, types from aiogram import Bot, F, Router, types
from neuroapi import neuroapi from neuroapi import neuroapi
from neuroapi.types import Admin as AdminType
class User_commands: class User_commands:
bot: Bot bot: Bot
@@ -15,12 +17,12 @@ class User_commands:
@self.router.message(F.chat.type == 'private') @self.router.message(F.chat.type == 'private')
async def forward_post(message: types.Message): async def forward_post(message: types.Message):
admins = await neuroapi.admin.get() admins: List[AdminType] = await neuroapi.admin.get()
canReply = True canReply = True
for a in admins: for admin in admins:
await bot.send_message(a['user_id'], f'Вам новое сообщение от пользователя {message.from_user.full_name}. ' + await bot.send_message(admin.user_id, f'Вам новое сообщение от пользователя {message.from_user.full_name}. ' +
(f'\nНик: @{message.from_user.username}' if message.from_user.username else f'ID: {message.from_user.id}')) (f'\nНик: @{message.from_user.username}' if message.from_user.username else f'ID: {message.from_user.id}'))
forwarded_message = await bot.forward_message(a['user_id'], message.chat.id, message.message_id) forwarded_message = await bot.forward_message(admin.user_id, message.chat.id, message.message_id)
if forwarded_message.forward_from is None: if forwarded_message.forward_from is None:
canReply = False canReply = False
await message.reply('Ваше сообщение было отправлено администраторам'+('' if canReply else '\nНо они не смогут вам ответить из-за ваших настроек конфиденциальности.')) await message.reply('Ваше сообщение было отправлено администраторам'+('' if canReply else '\nНо они не смогут вам ответить из-за ваших настроек конфиденциальности.'))

View File

@@ -2,13 +2,15 @@ from aiohttp import ClientSession
from .api_method import ApiMethod from .api_method import ApiMethod
from neuroapi.types import Admin as AdminType
class Admin(ApiMethod): class Admin(ApiMethod):
async def get(self): async def get(self):
async with ClientSession() as session: async with ClientSession() as session:
response = await session.get(self.api_url+'/admin/get') response = await session.get(self.api_url+'/admin/get')
return await response.json() return [AdminType.from_dict(admin) for admin in await response.json()]
async def is_admin(self, id: str): async def is_admin(self, id: str):
async with ClientSession() as session: async with ClientSession() as session:

View File

@@ -3,6 +3,7 @@ from aiohttp import ClientSession
from .api_method import ApiMethod from .api_method import ApiMethod
from .enums import EGetAll from .enums import EGetAll
import neuroapi.types as nat
class Post(ApiMethod): class Post(ApiMethod):
@@ -15,7 +16,7 @@ class Post(ApiMethod):
data = response.json() data = response.json()
if 'statusCode' in data: if 'statusCode' in data:
raise Exception(data['message']) raise Exception(data['message'])
return data return nat.Post.from_dict(data)
async def __get_all(self, status: EGetAll): async def __get_all(self, status: EGetAll):
async with ClientSession() as session: async with ClientSession() as session:
@@ -24,15 +25,15 @@ class Post(ApiMethod):
async def get_all(self): async def get_all(self):
result = await self.__get_all(EGetAll.all) result = await self.__get_all(EGetAll.all)
return await result.json() return [nat.Post.from_dict(post) for post in await result.json()]
async def get_will_post(self): async def get_will_post(self):
result = await self.__get_all(EGetAll.will_post) result = await self.__get_all(EGetAll.will_post)
return await result.json() return [nat.Post.from_dict(post) for post in await result.json()]
async def get_posted(self): async def get_posted(self):
result = await self.__get_all(EGetAll.posted) result = await self.__get_all(EGetAll.posted)
return await result.json() return [nat.Post.from_dict(post) for post in await result.json()]
async def get(self, post_id: str): async def get(self, post_id: str):
async with ClientSession() as session: async with ClientSession() as session:
@@ -40,18 +41,20 @@ class Post(ApiMethod):
data = await response.json() data = await response.json()
if 'statusCode' in data: if 'statusCode' in data:
raise Exception(data['message']) raise Exception(data['message'])
return data return nat.Post.from_dict(data)
async def get_by_media_group_id(self, media_group_id: str): async def get_by_media_group_id(self, media_group_id: str):
response = requests.get(self.api_url+f'/post/get-by-media-group-id/{media_group_id}') response = requests.get(
self.api_url+f'/post/get-by-media-group-id/{media_group_id}')
data = response.json() data = response.json()
if 'statusCode' in data: if 'statusCode' in data:
raise Exception(data['message']) raise Exception(data['message'])
return data return nat.Post.from_dict(data)
async def edit_text(self, post_id: str, text: str): async def edit_text(self, post_id: str, text: str):
response = requests.post(self.api_url+f"/post/edit/{post_id}", data={"text": text}) response = requests.post(
self.api_url+f"/post/edit/{post_id}", data={"text": text})
data = response.json() data = response.json()
if 'statusCode' in data: if 'statusCode' in data:
raise Exception(data['message']) raise Exception(data['message'])
return data return nat.Post.from_dict(data)