Merge pull request #6 from MrSedan/dev

Dev docker and proxy update
This commit is contained in:
2024-01-02 12:57:17 +03:00
committed by GitHub
35 changed files with 555 additions and 270 deletions

3
.dockerignore Normal file
View File

@@ -0,0 +1,3 @@
venv
.env.example
Dockerfile

View File

@@ -1,7 +1,5 @@
TOKEN=
DATABASE_PASSWORD=postgres
DATABASE_NAME=bot_db
DATABASE_USER=postgres
DATABASE_HOST=localhost
DATABASE_PORT=15432
PROXY_TOKEN=
API_URL="http://localhost:3000"

2
.gitignore vendored
View File

@@ -1,5 +1,7 @@
venv
**/__pycache__
.env.*
.env
!.env.example
data
**/*_old

9
Dockerfile Normal file
View File

@@ -0,0 +1,9 @@
FROM python:3.11-alpine
WORKDIR /app
COPY . .
RUN pip install -r requirements.txt
CMD [ "python", "main.py" ]

11
Dockerfile.dev Normal file
View File

@@ -0,0 +1,11 @@
FROM python:3.11-alpine
WORKDIR /app
COPY . .
RUN pip install -r requirements.txt
RUN pip install jurigged
CMD [ "python", "-m", "jurigged", "-v", "main.py" ]

View File

@@ -1,119 +0,0 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = alembic
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python-dateutil library that can be
# installed by adding `alembic[tz]` to the pip requirements
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
# timezone =
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to alembic/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:alembic/versions
# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
version_path_separator = os # Use os.pathsep. Default configuration used for new projects.
# set to 'true' to search source files recursively
# in each "version_locations" directory
# new in Alembic version 1.10
# recursive_version_locations = false
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
# sqlalchemy.url = driver://user:pass@localhost/dbname
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
# hooks = black
# black.type = console_scripts
# black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# lint with attempts to fix using "ruff" - use the exec runner, execute a binary
# hooks = ruff
# ruff.type = exec
# ruff.executable = %(here)s/.venv/bin/ruff
# ruff.options = --fix REVISION_SCRIPT_FILENAME
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
[alembic.ext]
sourceless=false

View File

@@ -1,17 +0,0 @@
version: '3.9'
services:
db:
container_name: bot_db
image: postgres:alpine
environment:
- POSTGRES_USER=${DATABASE_USER}
- POSTGRES_PASSWORD=${DATABASE_PASSWORD}
- POSTGRES_DB=${DATABASE_NAME}
- PGDATA=/var/lib/postgresql/data/pgdata
ports:
- "${DATABASE_PORT}:5432"
env_file:
- .env
volumes:
- ./data:/var/lib/postgresql/data

View File

@@ -1,52 +1,74 @@
from typing import Any
import asyncio
from typing import List
from aiogram import Bot, F, Router, types
import aioschedule as schedule
from aiogram import Bot, F, types
from aiogram.filters import Command
from aiogram.fsm.context import FSMContext
from aiogram.utils.media_group import MediaGroupBuilder
import neuroapi.types as neuroTypes
from handlers.filters.new_post import (ChangePosts, NewPostFilter,
NewSoloPostFilter)
from handlers.filters.reply_to_user import ReplyToUser
from handlers.handler import Handler
from handlers.middlewares.user import AdminMiddleware
from handlers.states.change_post import ChangePost
from neuroapi import neuroapi
from neuroapi.types import BotSettings as BotSettingsType
def get_post_info(post: dict, post_id: int) -> str:
text = post["text"]
time = post["timestamp"]
from_user = post["from_user_id"]
def get_post_info(post: neuroTypes.Post, post_id: int) -> str:
text = post.text
time = post.timestamp
from_user = post.from_user_id
s = f"""Индекс: {post_id}\nТекст: {text}\nВремя отправки: {time}\nОт: [id{from_user}](tg://user?id={from_user})""".replace('#', '\#').replace(
"_", "\_").replace('.', '\.').replace(',', '\,').replace('!', '\!').replace('-', '\-').replace(':', '\:')
"_", "\_").replace('.', '\.').replace(',', '\,').replace('!', '\!').replace('-', '\-').replace(':', '\:').replace('+', '\+')
return s
class Admin_commands:
bot: Bot
router: Router
class AdminCommands(Handler):
settings: BotSettingsType
def __init__(self, bot: Bot) -> None:
self.bot = bot
self.router = Router()
super().__init__(bot)
self.router.message.middleware(AdminMiddleware())
@self.router.message(NewPostFilter())
async def new_post(message: types.Message):
post = await neuroapi.post.get_by_media_group_id(message.media_group_id)
await neuroapi.image.add(post['uuid'], message.photo[-1].file_id, message.has_media_spoiler, message.message_id)
post: neuroTypes.Post = await neuroapi.post.get_by_media_group_id(message.media_group_id)
await neuroapi.image.add(str(post.uuid), message.photo[-1].file_id, message.has_media_spoiler, message.message_id)
@self.router.message(Command('info'))
async def info_command(message: types.Message):
posts = await neuroapi.post.get_will_post()
posts: List[neuroTypes.Post] = await neuroapi.post.get_will_post()
admins: List[neuroTypes.Admin] = await neuroapi.admin.get()
post_c = {}
for post in posts:
if post['from_user_id'] not in post_c:
post_c[post['from_user_id']] = 1
if post.from_user_id not in post_c:
post_c[post.from_user_id] = 1
else:
post_c[post['from_user_id']] += 1
await message.answer(str(post_c))
post_c[post.from_user_id] += 1
res = "Количество постов от админов:\n"
res2 = "\nПосты:\n"
for admin in admins:
if admin.user_id in post_c:
res += f'[{admin.user_name}](tg://user?id={admin.user_id}): {post_c[admin.user_id]}\n'
else:
res += f'[{admin.user_name}](tg://user?id={admin.user_id}): 0\n'
admin_posts = list(
filter(lambda x: x.from_user_id == admin.user_id, posts))
res2 += f'Посты от {admin.user_name}:\n'
if len(admin_posts):
for i, post in enumerate(admin_posts):
#TODO: Если возможно, сделать чтоб было ссылкой на сообщений с /newpost
res2 += f'{i+1}. {post.text}\n'
else:
res2 += 'Их нет\)\n'
await message.answer((res+res2).replace('#', '\#').replace("_", "\_").replace('.', '\.').replace(',', '\,').replace('!', '\!'), parse_mode='markdownv2')
"""
TODO: Изменение постов сделать нормально, не через редактирование сообщений
@self.router.message(ChangePosts())
async def change_post(message: types.Message, state: FSMContext):
posts = await neuroapi.post.get_will_post()
@@ -64,12 +86,13 @@ class Admin_commands:
text='Отмена', callback_data='cancel')]
]
keyboard = types.InlineKeyboardMarkup(inline_keyboard=kb)
post = await neuroapi.post.get(posts[0]['uuid'])
post = await neuroapi.post.get(str(posts[0].uuid))
images = MediaGroupBuilder(
caption=get_post_info(post, 1))
for image in sorted(post['images'], key=lambda x: x['message_id']):
images.add_photo(image['file_id'],
has_spoiler=image['has_spoiler'], parse_mode='markdownv2')
image: neuroTypes.Image
for image in sorted(post.images, key=lambda x: x.message_id):
images.add_photo(image.file_id,
has_spoiler=image.has_spoiler, parse_mode='markdownv2')
mes = await message.answer_media_group(images.build())
await state.update_data(edit_msg=mes[0].message_id)
await message.answer('Действия', reply_markup=keyboard)
@@ -84,7 +107,7 @@ class Admin_commands:
await callback.answer()
await callback.message.delete()
return
posts = data['posts']
posts: List[neuroTypes.Post] = data['posts']
post_id = data['id']+1
select_btns = [types.InlineKeyboardButton(
text='<-', callback_data='prev_post')]
@@ -100,7 +123,7 @@ class Admin_commands:
]
keyboard = types.InlineKeyboardMarkup(inline_keyboard=kb)
await state.update_data(id=post_id)
post = await neuroapi.post.get(posts[post_id]['uuid'])
post = await neuroapi.post.get(str(posts[post_id].uuid))
await bot.edit_message_caption(caption=get_post_info(post, post_id+1), chat_id=callback.message.chat.id, message_id=data['edit_msg'], parse_mode='markdownv2')
await callback.message.edit_reply_markup(reply_markup=keyboard)
await callback.answer()
@@ -129,9 +152,9 @@ class Admin_commands:
if 'posts' not in data:
await state.clear()
return
posts = data['posts']
posts: List[neuroTypes.Post] = data['posts']
post_id = data['id']
post_uuid = posts[post_id]['uuid']
post_uuid = str(posts[post_id].uuid)
try:
await neuroapi.post.edit_text(post_uuid, message.text)
await message.answer(f'Текст поста изменен на: {message.text}')
@@ -147,7 +170,7 @@ class Admin_commands:
await callback.answer()
await callback.message.delete()
return
posts = data['posts']
posts: List[neuroTypes.Post] = data['posts']
post_id = data['id']-1
select_btns = [types.InlineKeyboardButton(
text='->', callback_data='next_post')]
@@ -163,7 +186,7 @@ class Admin_commands:
]
keyboard = types.InlineKeyboardMarkup(inline_keyboard=kb)
await state.update_data(id=post_id)
post = await neuroapi.post.get(posts[post_id]['uuid'])
post = await neuroapi.post.get(str(posts[post_id].uuid))
await bot.edit_message_caption(caption=get_post_info(post, post_id), chat_id=callback.message.chat.id, message_id=data['edit_msg'], parse_mode='markdownv2')
await callback.message.edit_reply_markup(reply_markup=keyboard)
await callback.answer()
@@ -176,24 +199,32 @@ class Admin_commands:
data = await state.get_data()
if 'edit_msg' in data:
await bot.delete_message(message_id=data['edit_msg'], chat_id=callback.message.chat.id)
"""
@self.router.message(Command('post'))
async def post(message: types.Message):
posts = await neuroapi.post.get_will_post()
if (posts):
post = await neuroapi.post.get(posts[0]['uuid'])
images = MediaGroupBuilder(caption=post['text'])
for image in sorted(post['images'], key=lambda x: x['message_id']):
images.add_photo(image['file_id'],
has_spoiler=image['has_spoiler'])
await message.answer_media_group(images.build())
else:
async def post(message: types.Message | None = None):
try:
post = await neuroapi.post.get_post_to_post()
if (post):
images = MediaGroupBuilder(
caption=post.text + '\n\nПредложка: @neur0w0men_reply_bot')
image: neuroTypes.Image
for image in sorted(post.images, key=lambda x: x.message_id):
images.add_photo(image.file_id,
has_spoiler=image.has_spoiler)
await self.bot.send_media_group(self.settings.channel, images.build())
if message:
await message.answer('Пост успешно опубликован!')
elif message:
await message.answer('Нет постов')
except Exception as e:
if message:
await message.answer(f'Ошибка {e}')
@self.router.message(NewSoloPostFilter())
async def post_solo(message: types.Message):
post = await neuroapi.post.new(message.caption.replace('/newpost ', ''), message.from_user.id)
await neuroapi.image.add(post['uuid'], message.photo[-1].file_id, message.has_media_spoiler, message.message_id)
post: neuroTypes.Post = await neuroapi.post.new(message.caption.replace('/newpost ', ''), message.from_user.id)
await neuroapi.image.add(str(post.uuid), message.photo[-1].file_id, message.has_media_spoiler, message.message_id)
await message.answer('Пост успешно добавлен!')
@self.router.message(ReplyToUser())
@@ -207,9 +238,28 @@ class Admin_commands:
except Exception as e:
print(e)
def __call__(self, *args: Any, **kwds: Any) -> Router:
return self.router
@self.router.message(Command('update_settings'))
async def update_settings(mes: types.Message | None = None):
self.settings = await neuroapi.bot_settings.get()
schedule.clear()
schedule.every().minute.do(update_settings, None)
# TODO: Сделать в бэке и в боте, чтоб дни тоже можно было в настройках хранить
for i in self.settings.message_times:
schedule.every().monday.at(i).do(post, None)
schedule.every().tuesday.at(i).do(post, None)
schedule.every().wednesday.at(i).do(post, None)
schedule.every().thursday.at(i).do(post, None)
schedule.every().friday.at(i).do(post, None)
if i not in ['10:00', '20:00']:
schedule.every().sunday.at(i).do(post, None)
if mes:
await mes.answer('Настройки обновлены!')
def setup(bot: Bot) -> Router:
return Admin_commands(bot)()
async def settings_and_schedule_checker():
await update_settings()
while 1:
await schedule.run_pending()
await asyncio.sleep(1)
asyncio.create_task(settings_and_schedule_checker())

16
handlers/handler.py Normal file
View File

@@ -0,0 +1,16 @@
from typing import Any
from aiogram import Bot, Router
class Handler:
bot: Bot
router: Router
def __init__(self, bot: Bot) -> None:
assert isinstance(bot, Bot)
self.bot = bot
self.router = Router()
def __call__(self) -> Router:
return self.router

View File

@@ -11,6 +11,8 @@ class AdminMiddleware(BaseMiddleware):
pass
async def __call__(self, handler: Callable[[Message, Dict[str, Any]], Awaitable[Any]], event: Message, data: Dict[str, Any]) -> Any:
if event.chat.type not in ['private', 'group']:
return None
await neuroapi.user.get(str(event.from_user.id), event.from_user.username)
isAdmin = await neuroapi.admin.is_admin(str(event.from_user.id))
if not isAdmin:

View File

@@ -1,33 +1,26 @@
from typing import Any
from typing import List
from aiogram import Bot, F, Router, types
from aiogram import Bot, F, types
from handlers.handler import Handler
from neuroapi import neuroapi
from neuroapi.types import Admin as AdminType
class User_commands:
bot: Bot
router: Router
class UserCommands(Handler):
def __init__(self, bot: Bot) -> None:
self.bot = bot
self.router = Router()
super().__init__(bot)
@self.router.message(F.chat.type == 'private')
async def forward_post(message: types.Message):
admins = await neuroapi.admin.get()
admins: List[AdminType] = await neuroapi.admin.get()
canReply = True
for a in admins:
await bot.send_message(a['user_id'], f'Вам новое сообщение от пользователя {message.from_user.full_name}. ' +
for admin in admins:
await bot.send_message(admin.user_id, f'Вам новое сообщение от пользователя {message.from_user.full_name}. ' +
(f'\nНик: @{message.from_user.username}' if message.from_user.username else f'ID: {message.from_user.id}'))
forwarded_message = await bot.forward_message(a['user_id'], message.chat.id, message.message_id)
forwarded_message = await bot.forward_message(admin.user_id, message.chat.id, message.message_id)
if forwarded_message.forward_from is None:
canReply = False
await message.reply('Ваше сообщение было отправлено администраторам'+('' if canReply else '\nНо они не смогут вам ответить из-за ваших настроек конфиденциальности.'))
def __call__(self, *args: Any, **kwds: Any) -> Router:
return self.router
def setup(bot: Bot) -> Router:
return User_commands(bot)()

80
main.py
View File

@@ -1,43 +1,65 @@
import asyncio
import logging
import os
import platform
import signal
import sys
from os.path import dirname, join
import aioschedule as schedule
import dotenv
from aiogram import Bot, Dispatcher, F, types
from aiogram.filters import Command, CommandStart
from aiogram.fsm.context import FSMContext
from aiogram.fsm.state import State, StatesGroup
from aiogram.utils.keyboard import InlineKeyboardBuilder
import aiohttp
from handlers.admin_commands import Admin_commands
from handlers.admin_commands import AdminCommands
from handlers.user_commands import UserCommands
from neuroapi.config import Config
from neuroapi.types import NeuroApiBot
dotenv.load_dotenv()
token = os.getenv('TOKEN')
bot = Bot(token)
dp = Dispatcher()
@dp.message(CommandStart())
async def start_message(message: types.Message):
await message.answer('Абоба')
handlers_dir = join(dirname(__file__), 'handlers')
for filename in os.listdir(handlers_dir):
if filename.endswith('.py'):
module_name = filename[:-3]
setup = __import__(f"handlers.{module_name}", locals(), globals(), ['setup']).setup
dp.include_router(setup(bot))
async def delay_bot()->None:
if Config().token is None:
print('Delay bot needs token in environment')
return
bot = NeuroApiBot(Config().token)
bot.include_router(AdminCommands, UserCommands)
await bot.start()
async def proxy_bot()->None:
if Config().proxy_token is None:
print('Proxy bot needs token in environment')
return
bot = NeuroApiBot(Config().proxy_token)
bot.include_router()
await bot.start()
async def main() -> None:
# dp.include_router(Admin_commands(bot)())
await dp.start_polling(bot, skip_updates=True)
for i in range(5):
print(f'Checking connectivity to backend ({i+1}/5)...')
try:
async with aiohttp.ClientSession() as session:
response = await session.get(Config().api_url+'/ping')
data = str(await response.content.read(), encoding='utf-8')
if data == 'pong':
print('Successfully connected to backend')
break
else:
raise TimeoutError()
except:
print('Error! Waiting 3 secs and retrying...')
await asyncio.sleep(3)
tasks = [asyncio.create_task(delay_bot()), asyncio.create_task(proxy_bot())]
await asyncio.gather(*tasks)
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO, stream=sys.stdout)
loop = asyncio.get_event_loop()
if platform.system() == 'Windows':
try:
loop.run_until_complete(main())
except KeyboardInterrupt:
print("KeyboardInterrupt occurred")
finally:
loop.close()
else:
for signame in ('SIGINT', 'SIGTERM'):
loop.add_signal_handler(getattr(signal, signame), loop.stop)
try:
asyncio.run(main())
except KeyboardInterrupt:
pass

View File

@@ -1,17 +1 @@
from .post import Post
from .admin import Admin
from .user import User
from .image import Image
from dotenv import load_dotenv
import os
from os.path import join, dirname
load_dotenv(join(dirname(__file__), "..", '.env'))
class neuroapi:
post = Post(os.environ.get('API_URL'))
admin = Admin(os.environ.get('API_URL'))
user = User(os.environ.get('API_URL'))
image = Image(os.environ.get('API_URL'))
from ._neuroapi import neuroapi

View File

@@ -2,13 +2,15 @@ from aiohttp import ClientSession
from .api_method import ApiMethod
from neuroapi.types import Admin as AdminType
class Admin(ApiMethod):
async def get(self):
async with ClientSession() as session:
response = await session.get(self.api_url+'/admin/get')
return await response.json()
return [AdminType.from_dict(admin) for admin in await response.json()]
async def is_admin(self, id: str):
async with ClientSession() as session:

View File

@@ -0,0 +1,8 @@
from ..config import Config
class ApiMethod:
api_url: str
def __init__(self) -> None:
self.api_url = Config().api_url

View File

@@ -0,0 +1,13 @@
from aiohttp import ClientSession
from neuroapi.types import BotSettings as BotSettingsType
from .api_method import ApiMethod
class BotSettings(ApiMethod):
async def get(self)-> BotSettingsType:
async with ClientSession() as session:
response = await session.get(self.api_url+'/settings')
settings = BotSettingsType.from_dict(await response.json())
return settings

View File

@@ -1,6 +1,8 @@
import requests
from aiohttp import ClientSession
import neuroapi.types as neuroTypes
from .api_method import ApiMethod
from .enums import EGetAll
@@ -15,7 +17,7 @@ class Post(ApiMethod):
data = response.json()
if 'statusCode' in data:
raise Exception(data['message'])
return data
return neuroTypes.Post.from_dict(data)
async def __get_all(self, status: EGetAll):
async with ClientSession() as session:
@@ -24,15 +26,15 @@ class Post(ApiMethod):
async def get_all(self):
result = await self.__get_all(EGetAll.all)
return await result.json()
return [neuroTypes.Post.from_dict(post) for post in await result.json()]
async def get_will_post(self):
result = await self.__get_all(EGetAll.will_post)
return await result.json()
return [neuroTypes.Post.from_dict(post) for post in await result.json()]
async def get_posted(self):
result = await self.__get_all(EGetAll.posted)
return await result.json()
return [neuroTypes.Post.from_dict(post) for post in await result.json()]
async def get(self, post_id: str):
async with ClientSession() as session:
@@ -40,18 +42,30 @@ class Post(ApiMethod):
data = await response.json()
if 'statusCode' in data:
raise Exception(data['message'])
return data
return neuroTypes.Post.from_dict(data)
async def get_by_media_group_id(self, media_group_id: str):
response = requests.get(self.api_url+f'/post/get-by-media-group-id/{media_group_id}')
response = requests.get(
self.api_url+f'/post/get-by-media-group-id/{media_group_id}')
data = response.json()
if 'statusCode' in data:
raise Exception(data['message'])
return data
return neuroTypes.Post.from_dict(data)
async def edit_text(self, post_id: str, text: str):
response = requests.post(self.api_url+f"/post/edit/{post_id}", data={"text": text})
response = requests.post(
self.api_url+f"/post/edit/{post_id}", data={"text": text})
data = response.json()
if 'statusCode' in data:
raise Exception(data['message'])
return data
return neuroTypes.Post.from_dict(data)
async def get_post_to_post(self):
response = requests.get(self.api_url+f"/post/post")
data = response.json()
if 'statusCode' in data:
if response.status_code==404:
return None
else:
raise Exception(data['message'])
return neuroTypes.Post.from_dict(data)

View File

@@ -1,4 +1,5 @@
from aiohttp import ClientSession
from .api_method import ApiMethod

13
neuroapi/_neuroapi.py Normal file
View File

@@ -0,0 +1,13 @@
from ._methods.admin import Admin
from ._methods.bot_settings import BotSettings
from ._methods.image import Image
from ._methods.post import Post
from ._methods.user import User
class neuroapi:
post = Post()
admin = Admin()
user = User()
image = Image()
bot_settings = BotSettings()

View File

@@ -1,5 +0,0 @@
class ApiMethod:
api_url: str
def __init__(self, api_url: str) -> None:
self.api_url = api_url

45
neuroapi/config.py Normal file
View File

@@ -0,0 +1,45 @@
import os
import tomllib
from typing import List, Optional
from attr import dataclass
from dotenv import load_dotenv
from neuroapi.types import Singleton
from .types._helpers import *
@dataclass
class Settings:
time: List[str]
@staticmethod
def from_dict(obj: Any) -> 'Settings':
assert isinstance(obj, dict)
time = from_list(from_str, obj.get("time", []))
return Settings(time)
def to_dict(self) -> dict:
result: dict = {}
result['time'] = from_list(from_str, self.time)
return result
class Config(Singleton):
api_url: str
settings: Settings
token: Optional[str]
proxy_token: Optional[str]
def __init__(self):
load_dotenv(os.path.join(os.path.dirname(__file__), '..', '.env'))
if not os.path.exists(os.path.join(os.path.dirname(__file__), '..', 'settings.toml')): raise Exception('Settings.toml must be in root folder')
with open(os.path.join(os.path.dirname(__file__), '..', 'settings.toml'), 'rb') as f:
settings = tomllib.load(f)
self.settings = Settings.from_dict(settings)
self.api_url = os.environ.get('API_URL')
self.token = os.environ.get('TOKEN')
if self.token == '':
self.token = None
self.proxy_token = os.environ.get('PROXY_TOKEN')
if self.proxy_token == '':
self.proxy_token = None

View File

@@ -0,0 +1,7 @@
from ._admin import Admin
from ._bot import NeuroApiBot
from ._bot_settings import BotSettings
from ._image import Image
from ._post import Post
from ._singleton import Singleton
from ._user import User

23
neuroapi/types/_admin.py Normal file
View File

@@ -0,0 +1,23 @@
from dataclasses import dataclass
from typing import Any
from ._helpers import *
@dataclass
class Admin:
user_id: int
user_name: str
@staticmethod
def from_dict(obj: Any) -> 'Admin':
assert isinstance(obj, dict)
user_id = int(from_str(obj.get("user_id")))
user_name = from_str(obj.get("user_name"))
return Admin(user_id, user_name)
def to_dict(self) -> dict:
result: dict = {}
result["user_id"] = from_str(str(self.user_id))
result["user_name"] = from_str(self.user_name)
return result

28
neuroapi/types/_bot.py Normal file
View File

@@ -0,0 +1,28 @@
from aiogram import Bot, Dispatcher
from handlers.handler import Handler
class NeuroApiBot:
bot: Bot
dp: Dispatcher
_instances = {}
def __init__(self, token: str) -> None:
self.bot = Bot(token)
self.dp = Dispatcher()
def __new__(cls, token: str) -> 'NeuroApiBot':
assert isinstance(token, str)
if token not in cls._instances:
cls._instances[token] = super(NeuroApiBot, cls).__new__(cls)
return cls._instances[token]
def include_router(self, *routerClasses: Handler) -> None:
for routerClass in routerClasses:
assert issubclass(routerClass, Handler)
self.dp.include_routers(routerClass(self.bot)())
async def start(self, skip_updates=True):
await self.dp.start_polling(self.bot, skip_updates=skip_updates)

View File

@@ -0,0 +1,29 @@
from dataclasses import dataclass
from uuid import UUID
from ._helpers import *
@dataclass
class BotSettings:
uuid: UUID
message_times: List[str]
channel: str
is_active: bool
@staticmethod
def from_dict(obj: Any) -> 'BotSettings':
assert isinstance(obj, dict)
uuid = UUID(obj.get("uuid"))
message_times = from_list(from_str, obj.get("messageTimes"))
channel = from_str(obj.get("channel"))
is_active = from_bool(obj.get("isActive"))
return BotSettings(uuid, message_times, channel, is_active)
def to_dict(self) -> dict:
result: dict = {}
result["uuid"] = str(self.uuid)
result["messageTimes"] = from_list(from_str, self.message_times)
result["channel"] = from_str(self.channel)
result["isActive"] = from_bool(self.is_active)
return result

View File

@@ -0,0 +1,47 @@
from datetime import datetime
from typing import Any, Callable, List, TypeVar, Type, cast
import dateutil.parser
T = TypeVar("T")
def from_bool(x: Any) -> bool:
assert isinstance(x, bool)
return x
def from_str(x: Any) -> str:
assert isinstance(x, str)
return x
def from_union(fs, x):
for f in fs:
try:
return f(x)
except:
pass
assert False
def from_none(x: Any) -> Any:
assert x is None
return x
def from_list(f: Callable[[Any], T], x: Any) -> List[T]:
assert isinstance(x, list)
return [f(y) for y in x]
def from_datetime(x: Any) -> datetime:
return dateutil.parser.parse(x)
def to_class(c: Type[T], x: Any) -> dict:
assert isinstance(x, c)
return cast(Any, x).to_dict()
def from_int(x: Any) -> int:
assert isinstance(x, int) and not isinstance(x, bool)
return x

28
neuroapi/types/_image.py Normal file
View File

@@ -0,0 +1,28 @@
from dataclasses import dataclass
from uuid import UUID
from ._helpers import *
@dataclass
class Image:
message_id: int
file_id: str
has_spoiler: bool
post_uuid: UUID
@staticmethod
def from_dict(obj: Any) -> 'Image':
assert isinstance(obj, dict)
message_id = from_int(obj.get("message_id"))
file_id = from_str(obj.get("file_id"))
has_spoiler = from_bool(obj.get("has_spoiler"))
post_uuid = UUID(obj.get("post_uuid"))
return Image(message_id, file_id, has_spoiler, post_uuid)
def to_dict(self) -> dict:
result: dict = {}
result["message_id"] = from_int(self.message_id)
result["file_id"] = from_str(self.file_id)
result["has_spoiler"] = from_bool(self.has_spoiler)
result["post_uuid"] = str(self.post_uuid)
return result

44
neuroapi/types/_post.py Normal file
View File

@@ -0,0 +1,44 @@
from dataclasses import dataclass
from uuid import UUID
from datetime import datetime
from typing import Any, List, Optional
from ._image import Image
from ._helpers import *
@dataclass
class Post:
uuid: UUID
posted: bool
text: str
media_group_id: int | str
timestamp: datetime
from_user_id: int
images: Optional[List[Image]] = None
@staticmethod
def from_dict(obj: Any) -> 'Post':
assert isinstance(obj, dict)
uuid = UUID(obj.get("uuid"))
posted = from_bool(obj.get("posted"))
text = from_str(obj.get("text"))
media_group_id = from_str(obj.get("media_group_id")) if obj.get(
"media_group_id") is not None else 'None'
timestamp = from_datetime(obj.get("timestamp"))
from_user_id = int(from_str(obj.get("from_user_id")))
images = from_union([lambda x: from_list(
Image.from_dict, x), from_none], obj.get("images"))
return Post(uuid, posted, text, media_group_id, timestamp, from_user_id, images)
def to_dict(self) -> dict:
result: dict = {}
result["uuid"] = str(self.uuid)
result["posted"] = from_bool(self.posted)
result["text"] = from_str(self.text)
result["media_group_id"] = from_str(str(self.media_group_id))
result["timestamp"] = self.timestamp.isoformat()
result["from_user_id"] = from_str(str(self.from_user_id))
if self.images is not None:
result["images"] = from_union([lambda x: from_list(
lambda x: to_class(Image, x), x), from_none], self.images)
return result

View File

@@ -0,0 +1,10 @@
from typing import Self
class Singleton:
_instances = {}
def __new__(cls, *args, **kwargs) -> Self:
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__new__(cls)
return cls._instances[cls]

22
neuroapi/types/_user.py Normal file
View File

@@ -0,0 +1,22 @@
from dataclasses import dataclass
from typing import Any
from ._helpers import *
@dataclass
class User:
id: int
username: str
@staticmethod
def from_dict(obj: Any) -> 'User':
assert isinstance(obj, dict)
id = int(from_str(obj.get("id")))
username = from_str(obj.get("username"))
return User(id, username)
def to_dict(self) -> dict:
result: dict = {}
result["id"] = from_str(str(self.id))
result["username"] = from_str(self.username)
return result

View File

@@ -1,5 +1,6 @@
aiogram==3.1.1
aioschedule @ https://github.com/AleksHeller/python-aioschedule/archive/refs/heads/master.zip
python-dotenv==1.0.0
SQLAlchemy==2.0.22
alembic==1.12.1
requests==2.31.0
python-dateutil==2.8.2
aiohttp==3.8.6

1
settings.toml Normal file
View File

@@ -0,0 +1 @@
time = ["12:00", "14:00", "16:00", "18:00"]