import traceback import html import json import logging import re import more_itertools as mit from telegram import ( Update, ReplyParameters, ReplyKeyboardMarkup, ReplyKeyboardRemove, KeyboardButton, InlineKeyboardMarkup, InlineKeyboardButton, ) from telegram.ext import ( ApplicationBuilder, CommandHandler, MessageHandler, CallbackQueryHandler, CallbackContext, ContextTypes, filters, ) from telegram.constants import ParseMode from django.conf import settings from django.urls import reverse from django.db import models from .apps import TgBotUpdater from .models import TgItem, TmcElement, TmcField from tmc.models import CustomTable, BaseCustomField logger = logging.getLogger(__name__) def chunk(n, l): return [l[i : i + n] for i in range(0, len(l), n)] class TgBot: _app = None def __init__(self) -> None: self.token = settings.TGBOT["TOKEN"] self.baseurl = settings.TGBOT["BASE_URL"] self.webhook = settings.TGBOT["WEBHOOK_URL"] if not self.token or not self.baseurl: raise Exception("no token or baseurl") TgBot.app = ( ApplicationBuilder().token(settings.TGBOT["TOKEN"]).updater(None).build() ) async def set_webhook(self): await TgBot.app.bot.setWebhook( f"https://{self.baseurl}{reverse('tgitem-send-tg-data')}", drop_pending_updates=True, ) async def start_app(self): await TgBot.app.initialize() async def admin_action(self, name, queryset): if name == "admin_get_image": item = queryset try: result = await TgBot.app.bot.get_file(item) TgBotUpdater.return_values[item] = result.file_path except Exception as e: TgBotUpdater.return_values[item] = None if name == "admin_get_name": item = queryset try: result = await TgBot.app.bot.get_chat(item) TgBotUpdater.return_values[item] = result.effective_name except Exception as e: TgBotUpdater.return_values[item] = None async def set_handlers(self): TgBot.app.add_handler( CommandHandler("start", self.start, filters.ChatType.PRIVATE) ) TgBot.app.add_handler(CommandHandler("my", self.my, filters.ChatType.PRIVATE)) TgBot.app.add_handler(CommandHandler("inv", self.inv, filters.ChatType.PRIVATE)) TgBot.app.add_handler( MessageHandler( ( filters.ChatType.PRIVATE & ~filters.COMMAND & (filters.TEXT | filters.PHOTO) ), self.inv, ) ) TgBot.app.add_handler(CallbackQueryHandler(self.stop_inv, "stop_inv")) TgBot.app.add_handler(CallbackQueryHandler(self.get_inv, r"get_inv@(.*?)")) TgBot.app.add_handler(CallbackQueryHandler(self.add_tmc, r"add_tmc@(.*?)")) TgBot.app.add_handler( CallbackQueryHandler(self.add_element, r"add_element@(.*?)") ) TgBot.app.add_error_handler(self.error_handler) async def start(self, update: Update, context: CallbackContext): await update.message.reply_markdown_v2( ( "Это бот для проведения инвентаризации\n" "/my \-\- продолжить инвентаризацию\n" "/inv \-\- начать новую инвентаризацию" ), # reply_markup=ForceReply(selective=True), reply_parameters=ReplyParameters(message_id=update.message.message_id), ) async def my(self, update: Update, context: CallbackContext): user = update.effective_user current_step = context.chat_data.get("step", None) logger.info(f"Step {current_step} from user {user.full_name}") inv = [] async for e in TgItem.objects.filter(user_id=user.id): inv.append({"name": e.name, "id": str(e.id)}) keys = chunk(1, inv) if len(inv) > 0: await update.message.reply_markdown_v2( ("Ваши инвентаризации"), reply_markup=InlineKeyboardMarkup( [ [ InlineKeyboardButton( i["name"], callback_data=f'get_inv@{i["id"]}', ) for i in arr ] for arr in keys ] ), reply_parameters=ReplyParameters(message_id=update.message.message_id), ) else: await update.message.reply_markdown_v2( "У вас нет доступных для редактирования инвентаризаций" ) def format_username(self, user): return f"Специалист {user.name or user.full_name}, ID {user.id}" def format_inv(self, inv): return f"Инвентаризация `{inv.name}` от `{inv.created_at.strftime('%x')}`" async def get_tmc_count(self, inv): tmc_count = [] async for e in inv.tmc.values("tmc__name").annotate( count=models.Count("tmc__name") ): tmc_count.append(e) return tmc_count def format_tmc_count(self, tmc_count): res = ["Всего ТМЦ:"] res.append(", ".join([f"{e['tmc__name']} \({e['count']}\)" for e in tmc_count])) return " ".join(res) def format_tmc_name(self, tmc): return f"Название ТМЦ `{tmc.name}`" def format_element(self, field): return f"Элемент `{field.name}`" def stop_inv_button(self): return [ InlineKeyboardButton( "❌❌❌ Остановить инвентаризацию", callback_data="stop_inv" ) ] async def stop_inv(self, update: Update, context: CallbackContext): query = update.callback_query await update.effective_message.edit_reply_markup(InlineKeyboardMarkup([])) await query.answer() context.chat_data.clear() async def get_inv(self, update: Update, context: CallbackContext): query = update.callback_query await update.effective_message.edit_reply_markup(InlineKeyboardMarkup([])) await query.answer() inv_id = query.data.split("@")[-1] inv = await TgItem.objects.aget(id=inv_id) context.chat_data["inv"] = inv.id context.chat_data["step"] = "name" await self.inv(update, context) async def add_tmc(self, update: Update, context: CallbackContext): query = update.callback_query if update.callback_query: await update.effective_message.edit_reply_markup(InlineKeyboardMarkup([])) await query.answer() tmc_id = query.data.split("@")[-1] tmc = await CustomTable.objects.aget(id=tmc_id) tmc_element = await TmcElement.objects.acreate(tmc=tmc) inv = await TgItem.objects.aget(id=context.chat_data["inv"]) async for f in tmc.fields.all(): tmc_field = await TmcField.objects.acreate(field=f) await tmc_element.field.aadd(tmc_field) await inv.tmc.aadd(tmc_element) await inv.asave() context.chat_data["tmc"] = tmc_element.id context.chat_data["step"] = "add_tmc" await self.inv(update, context) async def add_element(self, update: Update, context: CallbackContext): query = update.callback_query if update.callback_query: await update.effective_message.edit_reply_markup(InlineKeyboardMarkup([])) await query.answer() inv = await TgItem.objects.aget(id=context.chat_data["inv"]) tmc = await inv.tmc.aget(id=context.chat_data["tmc"]) field_id = query.data.split("@")[-1] tmc_field = await tmc.field.aget(id=field_id) context.chat_data["element"] = tmc_field.id context.chat_data["step"] = "add_element" await self.inv(update, context) async def inv(self, update: Update, context: CallbackContext): user = update.effective_user current_step = context.chat_data.get("step", None) logger.info( f"Step {current_step} from user {user.full_name} in {update.message}" ) if update.effective_message.text == "/inv": context.chat_data["step"] = "name" await update.effective_message.reply_markdown_v2( "\n".join([self.format_username(user), f"Введите название объекта"]), reply_parameters=ReplyParameters( message_id=update.effective_message.message_id ), ) elif current_step == "name": if not context.chat_data.get("inv", None): inv = await TgItem.objects.acreate(user_id=user.id) inv.name = update.message.text await inv.asave() else: inv = await TgItem.objects.aget(id=context.chat_data["inv"]) tmc = [] async for e in CustomTable.objects.all(): tmc.append({"name": e.name, "id": e.id}) keys = chunk(2, tmc) context.chat_data["inv"] = inv.id context.chat_data["step"] = "add_tmc" text = "\n".join( [ self.format_inv(inv), self.format_tmc_count(await self.get_tmc_count(inv)), "Выберите, какую ТМЦ вы осматриваете:", ] ) keyboard = [ [ InlineKeyboardButton( i["name"], callback_data=f'{context.chat_data["step"]}@{i["id"]}', ) for i in arr ] for arr in keys ] keyboard.append(self.stop_inv_button()) await update.effective_message.reply_markdown_v2( text, reply_parameters=ReplyParameters( message_id=update.effective_message.message_id ), reply_markup=InlineKeyboardMarkup(keyboard), ) elif current_step == "add_tmc": inv = await TgItem.objects.aget(id=context.chat_data["inv"]) tmc_element = await inv.tmc.aget(id=context.chat_data["tmc"]) tmc = await CustomTable.objects.aget(id=tmc_element.tmc_id) fields = [] async for e in tmc_element.field.filter(text=None, file_id=None): f = await BaseCustomField.objects.aget(id=e.field_id) fields.append({"name": f.name, "id": e.id}) keys = chunk(1, fields) context.chat_data["tmc"] = tmc_element.id context.chat_data["step"] = "add_element" text = "\n".join( [ self.format_inv(inv), self.format_tmc_count(await self.get_tmc_count(inv)), self.format_tmc_name(tmc), f"Что вы загружаете?", ] ) keyboard = [ [ InlineKeyboardButton( i["name"], callback_data=f'{context.chat_data["step"]}@{i["id"]}', ) for i in arr ] for arr in keys ] keyboard.append(self.stop_inv_button()) await update.effective_message.reply_markdown_v2( text, reply_parameters=ReplyParameters( message_id=update.effective_message.message_id ), reply_markup=InlineKeyboardMarkup(keyboard), ) elif current_step == "add_element": inv = await TgItem.objects.aget(id=context.chat_data["inv"]) tmc_element = await inv.tmc.aget(id=context.chat_data["tmc"]) tmc = await CustomTable.objects.aget(id=tmc_element.tmc_id) element = await TmcField.objects.aget(id=context.chat_data["element"]) field = await BaseCustomField.objects.aget(id=element.field_id) context.chat_data["step"] = "add_element_data" await update.effective_message.reply_markdown_v2( "\n".join( [ self.format_inv(inv), self.format_tmc_count(await self.get_tmc_count(inv)), self.format_tmc_name(tmc), self.format_element(field), f"Загрузите фото или пришлите текст", ] ), reply_parameters=ReplyParameters( message_id=update.effective_message.message_id ), reply_markup=ReplyKeyboardRemove(), ) elif current_step == "add_element_data": inv = await TgItem.objects.aget(id=context.chat_data["inv"]) tmc_element = await inv.tmc.aget(id=context.chat_data["tmc"]) tmc = await CustomTable.objects.aget(id=tmc_element.tmc_id) element = await TmcField.objects.aget(id=context.chat_data["element"]) field = await BaseCustomField.objects.aget(id=element.field_id) if update.message.photo: element.file_id = update.message.photo[-1].file_id elif update.message.text: element.text = update.message.text await element.asave() await inv.asave() await update.effective_message.reply_markdown_v2( "Изображение сохранено" if update.message.photo else "Текст сохранен" ) empty_fields = await tmc_element.field.filter( text=None, file_id=None ).acount() if empty_fields > 0: context.chat_data["step"] = "add_tmc" await self.inv(update, context) else: context.chat_data["step"] = "name" await self.inv(update, context) else: logger.info(update.message.entities) logger.info(f"no step for update {update}") if "step" in context.chat_data and context.chat_data["step"] == current_step: context.chat_data["step"] = None context.chat_data["inv"] = None async def error_handler( self, update: object, context: ContextTypes.DEFAULT_TYPE ) -> None: """Log the error and send a telegram message to notify the developer.""" # Log the error before we do anything else, so we can see it even if something breaks. # logger.error("Exception while handling an update:", exc_info=context.error) # traceback.format_exception returns the usual python message about an exception, but as a # list of strings rather than a single string, so we have to join them together. tb_list = traceback.format_exception( None, context.error, context.error.__traceback__ ) tb_string = "".join(tb_list) # Build the message with some markup and additional information about what happened. # You might need to add some logic to deal with messages longer than the 4096 character limit. update_str = update.to_dict() if isinstance(update, Update) else str(update) message = ( "An exception was raised while handling an update\n" f"
update = {html.escape(json.dumps(update_str, indent=2, ensure_ascii=False))}"
            "
\n\n" f"
context.chat_data = {html.escape(str(context.chat_data))}
\n\n" f"
context.user_data = {html.escape(str(context.user_data))}
\n\n" f"
{html.escape(tb_string)}
" ) # logger.error(context.error) logger.info(f"error in tgbot {context.error}\n{tb_string}\nReply update")