import traceback import html import json import logging import re import more_itertools as mit from telegram import ( Update, ReplyParameters, ReplyKeyboardMarkup, ReplyKeyboardRemove, KeyboardButton, ) from telegram.ext import ( ApplicationBuilder, CommandHandler, MessageHandler, CallbackQueryHandler, CallbackContext, ContextTypes, filters, ) from telegram.constants import ParseMode from django.conf import settings from .apps import TgBotUpdater from .models import Element, TgItem from tmc.models import CustomTable logger = logging.getLogger(__name__) def chunk(n, l): return [l[i : i + n] for i in range(0, len(l), n)] class TgBot: _app = None def __init__(self) -> None: self.token = settings.TGBOT["TOKEN"] self.baseurl = settings.TGBOT["BASE_URL"] self.webhook = settings.TGBOT["WEBHOOK_URL"] if not self.token or not self.baseurl: raise Exception("no token or baseurl") TgBot.app = ( ApplicationBuilder().token(settings.TGBOT["TOKEN"]).updater(None).build() ) async def set_webhook(self): await TgBot.app.bot.setWebhook( f"https://{self.baseurl}/{self.webhook}/", drop_pending_updates=True ) async def start_app(self): await TgBot.app.initialize() async def admin_action(self, name, queryset): pass async def set_handlers(self): TgBot.app.add_handler(CommandHandler("start", self.start, filters.ChatType.PRIVATE)) TgBot.app.add_handler(CommandHandler("my", self.my, filters.ChatType.PRIVATE)) TgBot.app.add_handler(CommandHandler("inv", self.inv, filters.ChatType.PRIVATE)) TgBot.app.add_handler(MessageHandler(filters.ChatType.PRIVATE, self.inv)) TgBot.app.add_error_handler(self.error_handler) async def start(self, update: Update, context: CallbackContext): await update.message.reply_markdown_v2( ( "Это бот для проведения инвентаризации\n" "/my \-\- продолжить инвентаризацию\n" "/inv \-\- начать новую инвентаризацию" ), # reply_markup=ForceReply(selective=True), reply_parameters=ReplyParameters(message_id=update.message.message_id), ) async def my(self, update: Update, context: CallbackContext): user = update.effective_user current_step = context.chat_data.get("step", None) logger.info(f"Step {current_step} from user {user.full_name}") inv = [] async for e in TgItem.objects.filter(user_id=user.id): inv.append(e.name) keys = chunk(1, inv) await update.message.reply_markdown_v2( ("Ваши инвентаризации"), reply_markup=ReplyKeyboardMarkup( [[KeyboardButton(i) for i in arr] for arr in keys] ), reply_parameters=ReplyParameters(message_id=update.message.message_id), ) if len(inv): context.chat_data["step"] = "get_inv" async def inv(self, update: Update, context: CallbackContext): user = update.effective_user current_step = context.chat_data.get("step", None) if not update.message: return True logger.info(update.message) logger.info(f"Step {current_step} from user {user.full_name}") if current_step == "get_inv": inv = await TgItem.objects.aget(name=update.message.text) context.chat_data["inv"] = inv.id context.chat_data["step"] = "name" current_step = context.chat_data["step"] if not current_step and update.message.text == "/inv": inv = await TgItem.objects.acreate(user_id=user.id) await update.message.reply_markdown_v2( ( f"Специалист {user.name or user.full_name}, ID {user.id}\n" f"Начинаем инвентаризацию `#{inv.id}`\n" f"Введите название объекта" ), reply_parameters=ReplyParameters(message_id=update.message.message_id), ) context.chat_data["inv"] = inv.id context.chat_data["step"] = "name" elif current_step == "name": inv = await TgItem.objects.aget(id=context.chat_data["inv"]) inv.name = update.message.text await inv.asave() tmc = [] async for e in CustomTable.objects.all(): tmc.append(e.name) keys = chunk(3, tmc) await update.message.reply_markdown_v2( ( f"Инвентаризация `#{inv.id}`\n" f"Название объекта `{inv.name}`\n" f"Количество ТМЦ {await inv.tmc.acount()}" ), reply_parameters=ReplyParameters(message_id=update.message.message_id), reply_markup=ReplyKeyboardMarkup( [[KeyboardButton(i) for i in arr] for arr in keys] ), ) logger.info(tmc) context.chat_data["step"] = "add_tmc" elif current_step == "add_tmc": tmc_name = update.message.text tmc = await CustomTable.objects.aget(name=tmc_name) inv = await TgItem.objects.aget(id=context.chat_data["inv"]) await inv.tmc.aadd(tmc) fields = [] async for e in tmc.fields.all(): fields.append(e.name) keys = chunk(1, fields) await update.message.reply_markdown_v2( ( f"Инвентаризация `#{inv.id}`\n" f"Название объекта `{inv.name}`\n" f"Название ТМЦ `{tmc.name}`\n" f"Что вы загружаете?" ), reply_parameters=ReplyParameters(message_id=update.message.message_id), reply_markup=ReplyKeyboardMarkup( [[KeyboardButton(i) for i in arr] for arr in keys] ), ) context.chat_data["tmc"] = tmc.id context.chat_data["step"] = "add_field" elif current_step == "add_field": tmc = await CustomTable.objects.aget(id=context.chat_data["tmc"]) element_name = update.message.text element = await Element.objects.acreate(name=element_name, tmc=tmc) inv = await TgItem.objects.aget(id=context.chat_data["inv"]) await inv.element.aadd(element) await update.message.reply_markdown_v2( ( f"Инвентаризация `#{inv.id}`\n" f"Название объекта `{inv.name}`\n" f"Название ТМЦ `{tmc.name}`\n" f"Название элемента `{element.name}`\n" f"Загрузите фото или пришлите текст" ), reply_parameters=ReplyParameters(message_id=update.message.message_id), reply_markup=ReplyKeyboardRemove(), ) context.chat_data["element"] = element.id context.chat_data["step"] = "add_field_data" elif current_step == "add_field_data": inv = await TgItem.objects.aget(id=context.chat_data["inv"]) tmc = await CustomTable.objects.aget(id=inv.tmc) element = await Element.objects.aget(id=inv.element) if update.message.photo: element.photoid = update.message.photo[-1].file_id elif update.message.text: element.text = update.message.text await element.asave() await update.message.reply_markdown_v2( ( f"Инвентаризация `#{inv.id}`\n" f"Название объекта `{inv.name}`\n" f"Название ТМЦ `{tmc.name}`\n" f"Название элемента `{element.name}`\n" f"Данные загружены" ), reply_parameters=ReplyParameters(message_id=update.message.message_id), reply_markup=ReplyKeyboardRemove(), ) context.chat_data["element"] = element.id context.chat_data["step"] = "add_field_data" else: logger.info(update.message.entities) logger.info(f"no step for update {update}") if "step" in context.chat_data and context.chat_data["step"] == current_step: context.chat_data["step"] = None context.chat_data["inv"] = None async def error_handler( self, update: object, context: ContextTypes.DEFAULT_TYPE ) -> None: """Log the error and send a telegram message to notify the developer.""" # Log the error before we do anything else, so we can see it even if something breaks. logger.error("Exception while handling an update:", exc_info=context.error) # traceback.format_exception returns the usual python message about an exception, but as a # list of strings rather than a single string, so we have to join them together. tb_list = traceback.format_exception( None, context.error, context.error.__traceback__ ) tb_string = "".join(tb_list) # Build the message with some markup and additional information about what happened. # You might need to add some logic to deal with messages longer than the 4096 character limit. update_str = update.to_dict() if isinstance(update, Update) else str(update) message = ( "An exception was raised while handling an update\n" f"
update = {html.escape(json.dumps(update_str, indent=2, ensure_ascii=False))}"
            "
\n\n" f"
context.chat_data = {html.escape(str(context.chat_data))}
\n\n" f"
context.user_data = {html.escape(str(context.user_data))}
\n\n" f"
{html.escape(tb_string)}
" ) logger.error(context.error) logger.info(f"error in tgbot {context.error}\nReply update") TgBotUpdater.my_queue.put(update)