import traceback import html import json import logging import re import more_itertools as mit from telegram import ( Update, ReplyParameters, ReplyKeyboardMarkup, ReplyKeyboardRemove, KeyboardButton, InlineKeyboardMarkup, InlineKeyboardButton, ) from telegram.ext import ( ApplicationBuilder, CommandHandler, MessageHandler, CallbackQueryHandler, CallbackContext, ContextTypes, filters, ) from telegram.constants import ParseMode from django.conf import settings from django.urls import reverse from django.db import models from .models import TgItem, TmcElement, TmcField from .tasks import upload_file from tmc.models import CustomTable, BaseCustomField, Territory, TerritoryItem logger = logging.getLogger(__name__) def chunk(n, l): return [l[i : i + n] for i in range(0, len(l), n)] class TgBot: _app = None def __init__(self) -> None: self.token = settings.TGBOT["TOKEN"] self.baseurl = settings.TGBOT["BASE_URL"] self.webhook = settings.TGBOT["WEBHOOK_URL"] if not self.token or not self.baseurl: raise Exception("no token or baseurl") TgBot.app = ( ApplicationBuilder().token(settings.TGBOT["TOKEN"]).updater(None).build() ) async def set_webhook(self): await TgBot.app.bot.setWebhook( f"https://{self.baseurl}{reverse('tgitem-send-tg-data')}", drop_pending_updates=True, ) async def start_app(self): await TgBot.app.initialize() async def admin_action(self, name, queryset): from .updater import tg_bot_updater_instance if name == "admin_get_image": item = queryset try: result = await TgBot.app.bot.get_file(item) tg_bot_updater_instance.return_values[item] = result.file_path except Exception as e: tg_bot_updater_instance.return_values[item] = None if name == "admin_get_name": item = queryset try: result = await TgBot.app.bot.get_chat(item) tg_bot_updater_instance.return_values[item] = result.effective_name except Exception as e: tg_bot_updater_instance.return_values[item] = None async def set_handlers(self): TgBot.app.add_handler( CommandHandler("start", self.start, filters.ChatType.PRIVATE) ) TgBot.app.add_handler(CommandHandler("my", self.my, filters.ChatType.PRIVATE)) TgBot.app.add_handler(CommandHandler("inv", self.inv, filters.ChatType.PRIVATE)) TgBot.app.add_handler( MessageHandler( ( filters.ChatType.PRIVATE & ~filters.COMMAND & (filters.TEXT | filters.PHOTO) ), self.inv, ) ) TgBot.app.add_handler(CommandHandler("ter", self.ter, filters.ChatType.PRIVATE)) TgBot.app.add_handler(CallbackQueryHandler(self.ter_back, "ter_back")) TgBot.app.add_handler(CallbackQueryHandler(self.ter_next, "ter_next")) TgBot.app.add_handler( MessageHandler( filters.ChatType.PRIVATE & filters.Regex(r"/ter_(\d.*)"), self.terdeep, ) ) TgBot.app.add_handler(CallbackQueryHandler(self.terdeep_back, "terdeep_back")) TgBot.app.add_handler(CallbackQueryHandler(self.terdeep_next, "terdeep_next")) TgBot.app.add_handler( MessageHandler( filters.ChatType.PRIVATE & filters.Regex(r"/terdeep_(\d.*)"), self.terdeep_set, ) ) TgBot.app.add_handler(CallbackQueryHandler(self.stop_inv, "stop_inv")) TgBot.app.add_handler(CallbackQueryHandler(self.get_inv, r"get_inv@(.*?)")) TgBot.app.add_handler(CallbackQueryHandler(self.add_tmc, r"add_tmc@(.*?)")) TgBot.app.add_handler( CallbackQueryHandler(self.add_element, r"add_element@(.*?)") ) TgBot.app.add_error_handler(self.error_handler) async def start(self, update: Update, context: CallbackContext): await update.effective_message.reply_text( ( "Это бот для проведения инвентаризации\n" "/ter \-\- список территорий\n" "/inv \-\- начать новую инвентаризацию\n" "/my \-\- продолжить инвентаризацию\n" ), # reply_markup=ForceReply(selective=True), reply_parameters=ReplyParameters(message_id=update.effective_message.message_id), ) async def my(self, update: Update, context: CallbackContext): user = update.effective_user current_step = context.chat_data.get("step", None) logger.info(f"Step {current_step} from user {user.full_name}") inv = [] async for e in TgItem.objects.filter(user_id=user.id): inv.append({"name": e.name, "id": str(e.id)}) keys = chunk(1, inv) if len(inv) > 0: await update.message.reply_text( ("Ваши инвентаризации"), reply_markup=InlineKeyboardMarkup( [ [ InlineKeyboardButton( i["name"], callback_data=f'get_inv@{i["id"]}', ) for i in arr ] for arr in keys ] ), reply_parameters=ReplyParameters(message_id=update.message.message_id), ) else: await update.message.reply_text( "У вас нет доступных для редактирования инвентаризаций" ) def format_username(self, user): return f"Специалист {user.name or user.full_name}, ID {user.id}" def format_inv(self, inv): return f"Инвентаризация `{inv.name}` от `{inv.created_at.strftime('%x')}`" async def get_tmc_count(self, inv): tmc_count = [] async for e in inv.tmc.values("tmc__name").annotate( count=models.Count("tmc__name") ): tmc_count.append(e) return tmc_count def format_tmc_count(self, tmc_count): res = ["Всего ТМЦ:"] res.append(", ".join([f"{e['tmc__name']} \({e['count']}\)" for e in tmc_count])) return " ".join(res) def format_tmc_name(self, tmc): return f"Название ТМЦ `{tmc.name}`" def format_element(self, field): return f"Элемент `{field.name}`" def stop_inv_button(self): return [ InlineKeyboardButton( "❌❌❌ Остановить инвентаризацию", callback_data="stop_inv" ) ] async def stop_inv(self, update: Update, context: CallbackContext): query = update.callback_query await update.effective_message.edit_reply_markup(InlineKeyboardMarkup([])) await query.answer() context.chat_data.clear() await self.start(update, context) async def get_inv(self, update: Update, context: CallbackContext): query = update.callback_query await update.effective_message.edit_reply_markup(InlineKeyboardMarkup([])) await query.answer() inv_id = query.data.split("@")[-1] inv = await TgItem.objects.aget(id=inv_id) context.chat_data["terdeep_value"] = inv.location_id context.chat_data["inv"] = inv.id context.chat_data["step"] = "name" await self.inv(update, context) async def add_tmc(self, update: Update, context: CallbackContext): query = update.callback_query if update.callback_query: await update.effective_message.edit_reply_markup(InlineKeyboardMarkup([])) await query.answer() tmc_id = query.data.split("@")[-1] tmc = await CustomTable.objects.aget(id=tmc_id) tmc_element = await TmcElement.objects.acreate(tmc=tmc) inv = await TgItem.objects.aget(id=context.chat_data["inv"]) async for f in tmc.fields.all(): tmc_field = await TmcField.objects.acreate(field=f) await tmc_element.field.aadd(tmc_field) await inv.tmc.aadd(tmc_element) await inv.asave() context.chat_data["tmc"] = tmc_element.id context.chat_data["step"] = "add_tmc" await self.inv(update, context) async def add_element(self, update: Update, context: CallbackContext): query = update.callback_query if update.callback_query: await update.effective_message.edit_reply_markup(InlineKeyboardMarkup([])) await query.answer() inv = await TgItem.objects.aget(id=context.chat_data["inv"]) tmc = await inv.tmc.aget(id=context.chat_data["tmc"]) field_id = query.data.split("@")[-1] tmc_field = await tmc.field.aget(id=field_id) context.chat_data["element"] = tmc_field.id context.chat_data["step"] = "add_element" await self.inv(update, context) def set_step(self, update: Update, context: CallbackContext, prefix="ter"): context.chat_data["step"] = f"{prefix}" context.chat_data[f"{prefix}_start"] = 0 context.chat_data[f"{prefix}_count"] = 10 context.chat_data[f"{prefix}_renew"] = False async def step_btn( self, update: Update, context: CallbackContext, prefix="ter", step_type="plus" ): query = update.callback_query await query.answer() step_start = context.chat_data[f"{prefix}_start"] step_count = context.chat_data[f"{prefix}_count"] context.chat_data[f"{prefix}_renew"] = True if step_type == "plus": context.chat_data[f"{prefix}_start"] += step_count elif step_type == "minus": context.chat_data[f"{prefix}_start"] -= step_count func = getattr(self, prefix) await func(update, context) async def paged_data( self, update: Update, context: CallbackContext, prefix="ter", queryset=[] ): user = update.effective_user current_step = context.chat_data.get("step", None) logger.info(f"Step {current_step} from user {user.full_name}") locations = [] async for e in queryset: locations.append({"name": e.name, "id": e.id}) step_start = context.chat_data[f"{prefix}_start"] step_step = context.chat_data[f"{prefix}_count"] step_end = step_start + step_step text = "\n".join( [ self.format_username(user), "Выберите объект:", "\n".join( [ f"/{prefix}_{i['id']} {i['name']}" for i in locations[step_start:step_end] ] ), ] ) keyboard = [] if step_start > 0: keyboard.append( InlineKeyboardButton(text="←", callback_data=f"{prefix}_back") ) if step_end < len(locations): keyboard.append( InlineKeyboardButton(text="→", callback_data=f"{prefix}_next") ) keyboard = [keyboard] renew = context.chat_data.get(f"{prefix}_renew", None) logger.info(update.message) if renew: await update.effective_message.edit_text( text, reply_markup=InlineKeyboardMarkup(inline_keyboard=keyboard), ) else: await update.effective_message.reply_text( text, reply_parameters=ReplyParameters( message_id=update.effective_message.message_id ), reply_markup=InlineKeyboardMarkup(inline_keyboard=keyboard), ) async def ter_back(self, update: Update, context: CallbackContext): await self.step_btn(update, context, "ter", "minus") async def ter_next(self, update: Update, context: CallbackContext): await self.step_btn(update, context, "ter", "plus") async def ter(self, update: Update, context: CallbackContext): if not update.callback_query: self.set_step(update, context, "ter") await self.paged_data(update, context, "ter", Territory.objects.all()) async def terdeep_back(self, update: Update, context: CallbackContext): await self.step_btn(update, context, "terdeep", "minus") async def terdeep_next(self, update: Update, context: CallbackContext): await self.step_btn(update, context, "terdeep", "plus") async def terdeep(self, update: Update, context: CallbackContext): if not update.callback_query: self.set_step(update, context, "terdeep") parent = context.chat_data.get("terdeep", None) if len(re.findall(r"/ter_(\d.*)", update.effective_message.text)): parent = re.findall(r"/ter_(\d.*)", update.effective_message.text)[0] context.chat_data["terdeep"] = parent context.chat_data["terdeep_renew"] = False await self.paged_data( update, context, "terdeep", TerritoryItem.objects.filter(parent_id=parent) ) async def terdeep_set(self, update: Update, context: CallbackContext): context.chat_data["terdeep_value"] = re.findall( r"/terdeep_(\d.*)", update.effective_message.text )[0] text = ( "Вы выбрали территорию инвентаризации\n" "Теперь вы можете начать инвентаризацию /inv" ) await update.effective_message.reply_text( text=text, reply_parameters=ReplyParameters( message_id=update.effective_message.message_id ), ) async def inv(self, update: Update, context: CallbackContext): user = update.effective_user current_step = context.chat_data.get("step", None) logger.info(f"Step {current_step} from user {user.full_name}") if not context.chat_data.get("terdeep_value", None): await update.effective_message.reply_text( text=("Вы не выбрали территорию /ter"), reply_parameters=ReplyParameters( message_id=update.effective_message.message_id ), ) return if update.effective_message.text == "/inv": context.chat_data["step"] = "name" current_ter_id = context.chat_data.get("terdeep_value", None) current_ter = await TerritoryItem.objects.aget(id=current_ter_id) await update.effective_message.reply_text( "\n".join([self.format_username(user), f"Введите название объекта"]), reply_parameters=ReplyParameters( message_id=update.effective_message.message_id ), reply_markup=ReplyKeyboardMarkup([[KeyboardButton(current_ter.name)]]), ) if current_step == "name": if not context.chat_data.get("inv", None): current_ter_id = context.chat_data.get("terdeep_value", None) current_ter = await TerritoryItem.objects.aget(id=current_ter_id) inv = await TgItem.objects.acreate( user_id=user.id, location=current_ter ) inv.name = update.message.text await inv.asave() res = await update.effective_message.reply_text( "Ок, сохранено", reply_markup=ReplyKeyboardRemove() ) await res.delete() else: inv = await TgItem.objects.aget(id=context.chat_data["inv"]) tmc = [] async for e in CustomTable.objects.all(): tmc.append({"name": e.name, "id": e.id}) keys = chunk(2, tmc) context.chat_data["inv"] = inv.id context.chat_data["step"] = "add_tmc" text = "\n".join( [ self.format_inv(inv), self.format_tmc_count(await self.get_tmc_count(inv)), "Выберите, какую ТМЦ вы осматриваете:", ] ) keyboard = [ [ InlineKeyboardButton( i["name"], callback_data=f'{context.chat_data["step"]}@{i["id"]}', ) for i in arr ] for arr in keys ] keyboard.append(self.stop_inv_button()) await update.effective_message.reply_text( text, reply_parameters=ReplyParameters( message_id=update.effective_message.message_id ), reply_markup=InlineKeyboardMarkup(keyboard), ) elif current_step == "add_tmc": inv = await TgItem.objects.aget(id=context.chat_data["inv"]) tmc_element = await inv.tmc.aget(id=context.chat_data["tmc"]) tmc = await CustomTable.objects.aget(id=tmc_element.tmc_id) fields = [] async for e in tmc_element.field.filter(text=None, file_id=None): f = await BaseCustomField.objects.aget(id=e.field_id) fields.append({"name": f.name, "id": e.id}) keys = chunk(1, fields) context.chat_data["tmc"] = tmc_element.id context.chat_data["step"] = "add_element" text = "\n".join( [ self.format_inv(inv), self.format_tmc_count(await self.get_tmc_count(inv)), self.format_tmc_name(tmc), f"Что вы загружаете?", ] ) keyboard = [ [ InlineKeyboardButton( i["name"], callback_data=f'{context.chat_data["step"]}@{i["id"]}', ) for i in arr ] for arr in keys ] keyboard.append(self.stop_inv_button()) await update.effective_message.reply_text( text, reply_parameters=ReplyParameters( message_id=update.effective_message.message_id ), reply_markup=InlineKeyboardMarkup(keyboard), ) elif current_step == "add_element": inv = await TgItem.objects.aget(id=context.chat_data["inv"]) tmc_element = await inv.tmc.aget(id=context.chat_data["tmc"]) tmc = await CustomTable.objects.aget(id=tmc_element.tmc_id) element = await TmcField.objects.aget(id=context.chat_data["element"]) field = await BaseCustomField.objects.aget(id=element.field_id) context.chat_data["step"] = "add_element_data" await update.effective_message.reply_text( "\n".join( [ self.format_inv(inv), self.format_tmc_count(await self.get_tmc_count(inv)), self.format_tmc_name(tmc), self.format_element(field), f"Загрузите фото или пришлите текст", ] ), reply_parameters=ReplyParameters( message_id=update.effective_message.message_id ), reply_markup=ReplyKeyboardRemove(), ) elif current_step == "add_element_data": inv = await TgItem.objects.aget(id=context.chat_data["inv"]) tmc_element = await inv.tmc.aget(id=context.chat_data["tmc"]) tmc = await CustomTable.objects.aget(id=tmc_element.tmc_id) element = await TmcField.objects.aget(id=context.chat_data["element"]) field = await BaseCustomField.objects.aget(id=element.field_id) if update.message.photo: element.file_id = update.message.photo[-1].file_id upload_file.delay(element.file_id) elif update.message.text: element.text = update.message.text await element.asave() await inv.asave() await update.effective_message.reply_text( "Изображение сохранено" if update.message.photo else "Текст сохранен" ) empty_fields = await tmc_element.field.filter( text=None, file_id=None ).acount() if empty_fields > 0: context.chat_data["step"] = "add_tmc" await self.inv(update, context) else: context.chat_data["step"] = "name" await self.inv(update, context) else: # logger.info(update.message.entities) logger.info(f"no step for update {update}") if "step" in context.chat_data and context.chat_data["step"] == current_step: context.chat_data["step"] = None context.chat_data["inv"] = None async def error_handler( self, update: object, context: ContextTypes.DEFAULT_TYPE ) -> None: """Log the error and send a telegram message to notify the developer.""" # Log the error before we do anything else, so we can see it even if something breaks. # logger.error("Exception while handling an update:", exc_info=context.error) # traceback.format_exception returns the usual python message about an exception, but as a # list of strings rather than a single string, so we have to join them together. tb_list = traceback.format_exception( None, context.error, context.error.__traceback__ ) tb_string = "".join(tb_list) # Build the message with some markup and additional information about what happened. # You might need to add some logic to deal with messages longer than the 4096 character limit. update_str = update.to_dict() if isinstance(update, Update) else str(update) message = ( "An exception was raised while handling an update\n" f"
update = {html.escape(json.dumps(update_str, indent=2, ensure_ascii=False))}" "\n\n" f"
context.chat_data = {html.escape(str(context.chat_data))}\n\n" f"
context.user_data = {html.escape(str(context.user_data))}\n\n" f"
{html.escape(tb_string)}" ) # logger.error(context.error) logger.info(f"error in tgbot {context.error}\n{tb_string}\nReply update")