from django.conf import settings from asgiref.sync import sync_to_async from telegram import ( ForceReply, Update, ReplyParameters, ReplyKeyboardMarkup, ReplyKeyboardRemove, KeyboardButton, InlineKeyboardMarkup, InlineKeyboardButton, ) from telegram.ext import ( Application, CommandHandler, MessageHandler, filters, CallbackContext, ) from telegram.constants import ParseMode, ChatType, MessageEntityType from .apps import TgBotClass from .models import Element, Item from tmc.models import CustomTable, BaseCustomField import logging logger = logging.getLogger("root") def chunk(n, l): return [l[i : i + n] for i in range(0, len(l), n)] class TgBotApp: _app = None async def start(self, update: Update, context: CallbackContext): await update.message.reply_markdown_v2( ( "Это бот для проведения инвентаризации\n" "/my \-\- продолжить инвентаризацию\n" "/inv \-\- начать новую инвентаризацию" ), # reply_markup=ForceReply(selective=True), reply_parameters=ReplyParameters(message_id=update.message.message_id), ) async def my(self, update: Update, context: CallbackContext): user = update.effective_user current_step = context.chat_data.get("step", None) logger.info(f"Step {current_step} from user {user.full_name}") inv = [] async for e in Item.objects.filter(user_id=user.id): inv.append(e) await update.message.reply_markdown_v2( ("Ваши инвентаризации\n" "\n".join([f"`/inv_{i.id}`" for i in inv])), # reply_markup=ForceReply(selective=True), reply_parameters=ReplyParameters(message_id=update.message.message_id), ) async def inv(self, update: Update, context: CallbackContext): user = update.effective_user current_step = context.chat_data.get("step", None) logger.info(f"Step {current_step} from user {user.full_name}") if not current_step and update.message.text == "/inv": inv = await Item.objects.acreate(user_id=user.id) await update.message.reply_markdown_v2( ( f"Специалист {user.name or user.full_name}, ID {user.id}\n" f"Начинаем инвентаризацию `#{inv.id}`\n" f"Введите название объекта" ), reply_parameters=ReplyParameters(message_id=update.message.message_id), ) context.chat_data["inv"] = inv.id context.chat_data["step"] = "name" elif current_step is "name": inv = await Item.objects.aget(id=context.chat_data["inv"]) inv.name = update.message.text await inv.asave() tmc = [] async for e in CustomTable.objects.all(): tmc.append(e.name) keys = chunk(3, tmc) await update.message.reply_markdown_v2( (f"Инвентаризация `#{inv.id}`\n" f"Название объекта `{inv.name}`\n"), reply_parameters=ReplyParameters(message_id=update.message.message_id), reply_markup=ReplyKeyboardMarkup( [[KeyboardButton(i) for i in arr] for arr in keys] ), ) logger.info(tmc) context.chat_data["step"] = "add_tmc" elif current_step is "add_tmc": tmc_name = update.message.text tmc = await CustomTable.objects.aget(name=tmc_name) inv = await Item.objects.aget(id=context.chat_data["inv"]) await inv.tmc.aadd(tmc) fields = [] async for e in tmc.fields.all(): fields.append(e.name) keys = chunk(1, fields) await update.message.reply_markdown_v2( ( f"Инвентаризация `#{inv.id}`\n" f"Название объекта `{inv.name}`\n" f"Название ТМЦ `{tmc.name}`\n" f"Что вы загружаете?" ), reply_parameters=ReplyParameters(message_id=update.message.message_id), reply_markup=ReplyKeyboardMarkup( [[KeyboardButton(i) for i in arr] for arr in keys] ), ) context.chat_data["tmc"] = tmc.id context.chat_data["step"] = "add_field" elif current_step is "add_field": tmc = await CustomTable.objects.aget(id=context.chat_data["tmc"]) element_name = update.message.text element = await Element.objects.acreate(name=element_name, tmc=tmc) inv = await Item.objects.aget(id=context.chat_data["inv"]) await inv.element.aadd(element) await update.message.reply_markdown_v2( ( f"Инвентаризация `#{inv.id}`\n" f"Название объекта `{inv.name}`\n" f"Название ТМЦ `{tmc.name}`\n" f"Название элемента `{element.name}`\n" f"Загрузите фото или пришлите текст" ), reply_parameters=ReplyParameters(message_id=update.message.message_id), reply_markup=ReplyKeyboardRemove(), ) context.chat_data["element"] = element.id context.chat_data["step"] = "add_field_data" elif current_step is "add_field_data": element = await Element.objects.aget(id=context.chat_data["element"]) if update.message.photo: element.photoid = update.message.photo[-1].file_id elif update.message.text: element.text = update.message.text await element.asave() if "step" in context.chat_data and context.chat_data["step"] == current_step: context.chat_data["step"] = None context.chat_data["inv"] = None async def error(self, update: Update, context: CallbackContext): logger.info(f"error in tgbot {context.error}\nReply update") TgBotClass.my_queue.put(update) async def set_webhook(self, url): if not self._app: logger.error("no app") return app = self._app await app.bot.set_webhook(url, allowed_updates=Update.ALL_TYPES) async def init_tg(self): self._app = ( Application.builder() .token(settings.TGBOT["token"]) .concurrent_updates(True) .updater(None) .build() ) self._app.add_handler( CommandHandler("start", self.start, filters.ChatType.PRIVATE) ) self._app.add_handler(CommandHandler("my", self.my, filters.ChatType.PRIVATE)) self._app.add_handler(CommandHandler("inv", self.inv, filters.ChatType.PRIVATE)) self._app.add_handler(MessageHandler(filters.ChatType.PRIVATE, self.inv)) self._app.add_error_handler(self.error) logger.info( { "app": self._app, "bot": self._app.bot, "handlers": self._app.handlers, } ) await self._app.initialize() await self._app.start() return self._app