import traceback import html import json import logging import re import more_itertools as mit from telegram import ( Update, ReplyParameters, ReplyKeyboardMarkup, ReplyKeyboardRemove, KeyboardButton, InlineKeyboardMarkup, InlineKeyboardButton, ) from telegram.ext import ( ApplicationBuilder, CommandHandler, MessageHandler, CallbackQueryHandler, CallbackContext, ContextTypes, filters, ) from telegram.constants import ParseMode from django.conf import settings from django.urls import reverse from .apps import TgBotUpdater from .models import TgItem, TmcElement, TmcField from tmc.models import CustomTable, BaseCustomField logger = logging.getLogger(__name__) def chunk(n, l): return [l[i : i + n] for i in range(0, len(l), n)] class TgBot: _app = None def __init__(self) -> None: self.token = settings.TGBOT["TOKEN"] self.baseurl = settings.TGBOT["BASE_URL"] self.webhook = settings.TGBOT["WEBHOOK_URL"] if not self.token or not self.baseurl: raise Exception("no token or baseurl") TgBot.app = ( ApplicationBuilder().token(settings.TGBOT["TOKEN"]).updater(None).build() ) async def set_webhook(self): await TgBot.app.bot.setWebhook( f"https://{self.baseurl}{reverse('tgitem-send-tg-data')}", drop_pending_updates=True ) async def start_app(self): await TgBot.app.initialize() async def admin_action(self, name, queryset): if name == 'admin_get_image': async for item in queryset: try: text = await TgBot.app.bot.get_file(item.text) TgBotUpdater.return_values[item.text] = text.file_path except Exception as e: TgBotUpdater.return_values[item.text] = None if name == 'admin_get_name': item = queryset[0] try: text = await TgBot.app.bot.get_chat(item) TgBotUpdater.return_values[item] = text.effective_name except Exception as e: TgBotUpdater.return_values[item] = None async def set_handlers(self): TgBot.app.add_handler( CommandHandler("start", self.start, filters.ChatType.PRIVATE) ) TgBot.app.add_handler(CommandHandler("my", self.my, filters.ChatType.PRIVATE)) TgBot.app.add_handler(CommandHandler("inv", self.inv, filters.ChatType.PRIVATE)) TgBot.app.add_handler(MessageHandler(filters.ChatType.PRIVATE, self.inv)) TgBot.app.add_handler(CallbackQueryHandler(self.get_inv, r"get_inv@(.*?)")) TgBot.app.add_handler(CallbackQueryHandler(self.add_tmc, r"add_tmc@(.*?)")) TgBot.app.add_handler( CallbackQueryHandler(self.add_element, r"add_element@(.*?)") ) TgBot.app.add_error_handler(self.error_handler) async def start(self, update: Update, context: CallbackContext): await update.message.reply_markdown_v2( ( "Это бот для проведения инвентаризации\n" "/my \-\- продолжить инвентаризацию\n" "/inv \-\- начать новую инвентаризацию" ), # reply_markup=ForceReply(selective=True), reply_parameters=ReplyParameters(message_id=update.message.message_id), ) async def my(self, update: Update, context: CallbackContext): user = update.effective_user current_step = context.chat_data.get("step", None) logger.info(f"Step {current_step} from user {user.full_name}") inv = [] async for e in TgItem.objects.filter(user_id=user.id): inv.append({"name": e.name, "id": str(e.id)}) keys = chunk(1, inv) if len(inv) > 0: await update.message.reply_markdown_v2( ("Ваши инвентаризации"), reply_markup=InlineKeyboardMarkup( [ [ InlineKeyboardButton( i["name"], callback_data=f'get_inv@{i["id"]}', ) for i in arr ] for arr in keys ] ), reply_parameters=ReplyParameters(message_id=update.message.message_id), ) else: await update.message.reply_markdown_v2( "У вас нет доступных для редактирования инвентаризаций" ) async def format_text(self, user, inv=None, tmc=None): text = ( f"Специалист {user.full_name}, ID {user.id}\n" f"Введите название объекта" ), async def get_inv(self, update: Update, context: CallbackContext): query = update.callback_query await update.effective_message.edit_reply_markup(InlineKeyboardMarkup([])) await query.answer() inv_id = query.data.split("@")[-1] inv = await TgItem.objects.aget(id=inv_id) context.chat_data["inv"] = inv.id context.chat_data["step"] = "name" await self.inv(update, context) async def add_tmc(self, update: Update, context: CallbackContext): query = update.callback_query if update.callback_query: await update.effective_message.edit_reply_markup(InlineKeyboardMarkup([])) await query.answer() tmc_id = query.data.split("@")[-1] tmc = await CustomTable.objects.aget(id=tmc_id) tmc_element = await TmcElement.objects.acreate(tmc=tmc) inv = await TgItem.objects.aget(id=context.chat_data["inv"]) async for f in tmc.fields.all(): tmc_field = await TmcField.objects.acreate(field=f) await tmc_element.field.aadd(tmc_field) await inv.tmc.aadd(tmc_element) context.chat_data["tmc"] = tmc_element.id context.chat_data["step"] = "add_tmc" await self.inv(update, context) async def add_element(self, update: Update, context: CallbackContext): query = update.callback_query if update.callback_query: await update.effective_message.edit_reply_markup(InlineKeyboardMarkup([])) await query.answer() inv = await TgItem.objects.aget(id=context.chat_data["inv"]) tmc = await inv.tmc.aget(id=context.chat_data["tmc"]) field_id = query.data.split("@")[-1] tmc_field = await tmc.field.aget(id=field_id) context.chat_data["element"] = tmc_field.id context.chat_data["step"] = "add_element" await self.inv(update, context) async def inv(self, update: Update, context: CallbackContext): user = update.effective_user current_step = context.chat_data.get("step", None) logger.info( f"Step {current_step} from user {user.full_name} in {update.message}" ) if update.effective_message.text == "/inv": context.chat_data["step"] = "name" await update.effective_message.reply_markdown_v2( ( f"Специалист {user.name or user.full_name}, ID {user.id}\n" f"Введите название объекта" ), reply_parameters=ReplyParameters( message_id=update.effective_message.message_id ), ) elif current_step == "name": if not context.chat_data.get("inv", None): inv = await TgItem.objects.acreate(user_id=user.id) inv.name = update.message.text await inv.asave() else: inv = await TgItem.objects.aget(id=context.chat_data["inv"]) tmc = [] async for e in CustomTable.objects.all(): tmc.append({"name": e.name, "id": e.id}) keys = chunk(2, tmc) context.chat_data["inv"] = inv.id context.chat_data["step"] = "add_tmc" await update.effective_message.reply_markdown_v2( ( f"Инвентаризация `{inv.name}` от `{inv.created_at.strftime('%x')}`\n" f"Количество ТМЦ {await inv.tmc.acount()}\n\n" "Выберите, какую ТМЦ вы осматриваете:" ), reply_parameters=ReplyParameters( message_id=update.effective_message.message_id ), reply_markup=InlineKeyboardMarkup( [ [ InlineKeyboardButton( i["name"], callback_data=f'{context.chat_data["step"]}@{i["id"]}', ) for i in arr ] for arr in keys ] ), ) elif current_step == "add_tmc": inv = await TgItem.objects.aget(id=context.chat_data["inv"]) tmc_element = await inv.tmc.aget(id=context.chat_data["tmc"]) tmc = await CustomTable.objects.aget(id=tmc_element.tmc_id) fields = [] async for e in tmc_element.field.filter(text=None): f = await BaseCustomField.objects.aget(id=e.field_id) fields.append({"name": f.name, "id": e.id}) keys = chunk(1, fields) context.chat_data["tmc"] = tmc_element.id context.chat_data["step"] = "add_element" await update.effective_message.reply_markdown_v2( ( f"Инвентаризация `{inv.name}` от `{inv.created_at.strftime('%x')}`\n" f"Количество ТМЦ {await inv.tmc.acount()}\n\n" f"Название ТМЦ `{tmc.name}`\n" f"Что вы загружаете?" ), reply_parameters=ReplyParameters( message_id=update.effective_message.message_id ), reply_markup=InlineKeyboardMarkup( [ [ InlineKeyboardButton( i["name"], callback_data=f'{context.chat_data["step"]}@{i["id"]}', ) for i in arr ] for arr in keys ] ), ) elif current_step == "add_element": inv = await TgItem.objects.aget(id=context.chat_data["inv"]) tmc = await inv.tmc.aget(id=context.chat_data["tmc"]) element = await TmcField.objects.aget(id=context.chat_data["element"]) field = await BaseCustomField.objects.aget(id=element.field_id) context.chat_data["step"] = "add_element_data" await update.effective_message.reply_markdown_v2( ( f"Инвентаризация `{inv.name}` от `{inv.created_at.strftime('%x')}`\n" f"Количество ТМЦ {await inv.tmc.acount()}\n\n" f"Название ТМЦ `{tmc.name}`\n" f"Элемент `{field.name}`\n" f"Загрузите фото или пришлите текст" ), reply_parameters=ReplyParameters( message_id=update.effective_message.message_id ), reply_markup=ReplyKeyboardRemove(), ) elif current_step == "add_element_data": inv = await TgItem.objects.aget(id=context.chat_data["inv"]) tmc_element = await inv.tmc.aget(id=context.chat_data["tmc"]) tmc = await CustomTable.objects.aget(id=tmc_element.tmc_id) element = await TmcField.objects.aget(id=context.chat_data["element"]) field = await BaseCustomField.objects.aget(id=element.field_id) if update.message.photo: element.text = update.message.photo[-1].file_id elif update.message.text: element.text = update.message.text await element.asave() empty_fields = await tmc_element.field.filter(text=None).acount() if empty_fields > 0: context.chat_data["step"] = "add_tmc" await self.inv(update, context) else: context.chat_data["step"] = "name" await self.inv(update, context) else: logger.info(update.message.entities) logger.info(f"no step for update {update}") if "step" in context.chat_data and context.chat_data["step"] == current_step: context.chat_data["step"] = None context.chat_data["inv"] = None async def error_handler( self, update: object, context: ContextTypes.DEFAULT_TYPE ) -> None: """Log the error and send a telegram message to notify the developer.""" # Log the error before we do anything else, so we can see it even if something breaks. # logger.error("Exception while handling an update:", exc_info=context.error) # traceback.format_exception returns the usual python message about an exception, but as a # list of strings rather than a single string, so we have to join them together. tb_list = traceback.format_exception( None, context.error, context.error.__traceback__ ) tb_string = "".join(tb_list) # Build the message with some markup and additional information about what happened. # You might need to add some logic to deal with messages longer than the 4096 character limit. update_str = update.to_dict() if isinstance(update, Update) else str(update) message = ( "An exception was raised while handling an update\n" f"
update = {html.escape(json.dumps(update_str, indent=2, ensure_ascii=False))}"
            "
\n\n" f"
context.chat_data = {html.escape(str(context.chat_data))}
\n\n" f"
context.user_data = {html.escape(str(context.user_data))}
\n\n" f"
{html.escape(tb_string)}
" ) # logger.error(context.error) logger.info(f"error in tgbot {context.error}\n{tb_string}\nReply update")