to_inventory/back/tgbot/tgbot.py

348 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import traceback
import html
import json
import logging
import re
import more_itertools as mit
from telegram import (
Update,
ReplyParameters,
ReplyKeyboardMarkup,
ReplyKeyboardRemove,
KeyboardButton,
InlineKeyboardMarkup,
InlineKeyboardButton,
)
from telegram.ext import (
ApplicationBuilder,
CommandHandler,
MessageHandler,
CallbackQueryHandler,
CallbackContext,
ContextTypes,
filters,
)
from telegram.constants import ParseMode
from django.conf import settings
from .apps import TgBotUpdater
from .models import TmcElement, TgItem
from tmc.models import CustomTable, BaseCustomField
logger = logging.getLogger(__name__)
def chunk(n, l):
return [l[i : i + n] for i in range(0, len(l), n)]
class TgBot:
_app = None
def __init__(self) -> None:
self.token = settings.TGBOT["TOKEN"]
self.baseurl = settings.TGBOT["BASE_URL"]
self.webhook = settings.TGBOT["WEBHOOK_URL"]
if not self.token or not self.baseurl:
raise Exception("no token or baseurl")
TgBot.app = (
ApplicationBuilder().token(settings.TGBOT["TOKEN"]).updater(None).build()
)
async def set_webhook(self):
await TgBot.app.bot.setWebhook(
f"https://{self.baseurl}/{self.webhook}/", drop_pending_updates=True
)
async def start_app(self):
await TgBot.app.initialize()
async def admin_action(self, name, queryset):
pass
async def set_handlers(self):
TgBot.app.add_handler(
CommandHandler("start", self.start, filters.ChatType.PRIVATE)
)
TgBot.app.add_handler(CommandHandler("my", self.my, filters.ChatType.PRIVATE))
TgBot.app.add_handler(CommandHandler("inv", self.inv, filters.ChatType.PRIVATE))
TgBot.app.add_handler(MessageHandler(filters.ChatType.PRIVATE, self.inv))
TgBot.app.add_handler(CallbackQueryHandler(self.get_inv, r"get_inv@(.*?)"))
TgBot.app.add_handler(CallbackQueryHandler(self.add_tmc, r"add_tmc@(.*?)"))
TgBot.app.add_handler(
CallbackQueryHandler(self.add_element, r"add_element@(.*?)")
)
TgBot.app.add_error_handler(self.error_handler)
async def start(self, update: Update, context: CallbackContext):
await update.message.reply_markdown_v2(
(
"Это бот для проведения инвентаризации\n"
"/my \-\- продолжить инвентаризацию\n"
"/inv \-\- начать новую инвентаризацию"
),
# reply_markup=ForceReply(selective=True),
reply_parameters=ReplyParameters(message_id=update.message.message_id),
)
async def my(self, update: Update, context: CallbackContext):
user = update.effective_user
current_step = context.chat_data.get("step", None)
logger.info(f"Step {current_step} from user {user.full_name}")
inv = []
async for e in TgItem.objects.filter(user_id=user.id):
inv.append({"name": e.name, "id": str(e.id)})
keys = chunk(1, inv)
if len(inv) > 0:
await update.message.reply_markdown_v2(
("Ваши инвентаризации"),
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
i["name"],
callback_data=f'get_inv@{i["id"]}',
)
for i in arr
]
for arr in keys
]
),
reply_parameters=ReplyParameters(message_id=update.message.message_id),
)
else:
await update.message.reply_markdown_v2(
"У вас нет доступных для редактирования инвентаризаций"
)
async def get_inv(self, update: Update, context: CallbackContext):
query = update.callback_query
await update.effective_message.edit_reply_markup(InlineKeyboardMarkup([]))
await query.answer()
inv_id = query.data.split("@")[-1]
inv = await TgItem.objects.aget(id=inv_id)
context.chat_data["inv"] = inv.id
context.chat_data["step"] = "name"
await self.inv(update, context)
async def add_tmc(self, update: Update, context: CallbackContext):
query = update.callback_query
await update.effective_message.edit_reply_markup(InlineKeyboardMarkup([]))
await query.answer()
tmc_id = query.data.split("@")[-1]
tmc = await CustomTable.objects.aget(id=tmc_id)
inv = await TgItem.objects.aget(id=context.chat_data["inv"])
await inv.tmc.aadd(tmc)
context.chat_data["tmc"] = tmc.id
context.chat_data["step"] = "add_tmc"
await self.inv(update, context)
async def add_element(self, update: Update, context: CallbackContext):
query = update.callback_query
await update.effective_message.edit_reply_markup(InlineKeyboardMarkup([]))
await query.answer()
inv = await TgItem.objects.aget(id=context.chat_data["inv"])
tmc = await CustomTable.objects.aget(id=context.chat_data["tmc"])
field_id = query.data.split("@")[-1]
name = await BaseCustomField.objects.aget(id=field_id)
element = await TmcElement.objects.acreate(name=name, tmc=tmc)
await inv.element.aadd(element)
context.chat_data["element"] = element.id
context.chat_data["step"] = "add_element"
await self.inv(update, context)
async def inv(self, update: Update, context: CallbackContext):
user = update.effective_user
current_step = context.chat_data.get("step", None)
logger.info(
f"Step {current_step} from user {user.full_name} in {update.message}"
)
if not current_step and update.message.text == "/inv":
context.chat_data["step"] = "name"
await update.effective_message.reply_markdown_v2(
(
f"Специалист {user.name or user.full_name}, ID {user.id}\n"
f"Введите название объекта"
),
reply_parameters=ReplyParameters(
message_id=update.effective_message.message_id
),
)
elif current_step == "name":
if not context.chat_data.get("inv", None):
inv = await TgItem.objects.acreate(user_id=user.id)
inv.name = update.message.text
await inv.asave()
else:
inv = await TgItem.objects.aget(id=context.chat_data["inv"])
tmc = []
async for e in CustomTable.objects.all():
tmc.append({"name": e.name, "id": e.id})
keys = chunk(2, tmc)
context.chat_data["inv"] = inv.id
context.chat_data["step"] = "add_tmc"
await update.effective_message.reply_markdown_v2(
(
f"Инвентаризация `{inv.name}` от `{inv.created_at.strftime('%x')}`\n"
f"Количество ТМЦ {await inv.tmc.acount()}\n\n"
"Выберите, какую ТМЦ вы осматриваете:"
),
reply_parameters=ReplyParameters(
message_id=update.effective_message.message_id
),
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
i["name"],
callback_data=f'{context.chat_data["step"]}@{i["id"]}',
)
for i in arr
]
for arr in keys
]
),
)
elif current_step == "add_tmc":
inv = await TgItem.objects.aget(id=context.chat_data["inv"])
tmc = await CustomTable.objects.aget(id=context.chat_data["tmc"])
await inv.tmc.aadd(tmc)
fields = []
async for e in tmc.fields.all():
fields.append({"name": e.name, "id": e.id})
keys = chunk(1, fields)
context.chat_data["tmc"] = tmc.id
context.chat_data["step"] = "add_element"
await update.effective_message.reply_markdown_v2(
(
f"Инвентаризация `{inv.name}` от `{inv.created_at.strftime('%x')}`\n"
f"Количество ТМЦ {await inv.tmc.acount()}\n\n"
f"Название ТМЦ `{tmc.name}`\n"
f"Что вы загружаете?"
),
reply_parameters=ReplyParameters(
message_id=update.effective_message.message_id
),
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
i["name"],
callback_data=f'{context.chat_data["step"]}@{i["id"]}',
)
for i in arr
]
for arr in keys
]
),
)
elif current_step == "add_element":
inv = await TgItem.objects.aget(id=context.chat_data["inv"])
tmc = inv.tmc.aget(id=context.chat_data["tmc"])
element = inv.element.aget(id=context.chat_data["element"])
context.chat_data["step"] = "add_element_data"
await update.effective_message.reply_markdown_v2(
(
f"Инвентаризация `{inv.name}` от `{inv.created_at.strftime('%x')}`\n"
f"Количество ТМЦ {await inv.tmc.acount()}\n\n"
f"Название ТМЦ `{tmc.name}`\n"
f"Элемент `{element.name}`\n"
f"Загрузите фото или пришлите текст"
),
reply_parameters=ReplyParameters(
message_id=update.effective_message.message_id
),
reply_markup=ReplyKeyboardRemove(),
)
elif current_step == "add_element_data":
inv = await TgItem.objects.aget(id=context.chat_data["inv"])
tmc = inv.tmc.aget(id=context.chat_data["tmc"])
element = inv.element.aget(id=context.chat_data["element"])
if update.message.photo:
element.photoid = update.message.photo[-1].file_id
elif update.message.text:
element.text = update.message.text
await element.asave()
await update.message.reply_markdown_v2(
(
f"Инвентаризация `{inv.name}` от `{inv.created_at.strftime('%x')}`\n"
f"Количество ТМЦ {await inv.tmc.acount()}\n\n"
f"Название ТМЦ `{tmc.name}`\n"
f"Элемент `{element.name}`\n"
f"Данные загружены"
),
reply_parameters=ReplyParameters(message_id=update.message.message_id),
reply_markup=ReplyKeyboardRemove(),
)
context.chat_data["element"] = element.id
context.chat_data["step"] = "add_element_data"
else:
logger.info(update.message.entities)
logger.info(f"no step for update {update}")
if "step" in context.chat_data and context.chat_data["step"] == current_step:
context.chat_data["step"] = None
context.chat_data["inv"] = None
async def error_handler(
self, update: object, context: ContextTypes.DEFAULT_TYPE
) -> None:
"""Log the error and send a telegram message to notify the developer."""
# Log the error before we do anything else, so we can see it even if something breaks.
logger.error("Exception while handling an update:", exc_info=context.error)
# traceback.format_exception returns the usual python message about an exception, but as a
# list of strings rather than a single string, so we have to join them together.
tb_list = traceback.format_exception(
None, context.error, context.error.__traceback__
)
tb_string = "".join(tb_list)
# Build the message with some markup and additional information about what happened.
# You might need to add some logic to deal with messages longer than the 4096 character limit.
update_str = update.to_dict() if isinstance(update, Update) else str(update)
message = (
"An exception was raised while handling an update\n"
f"<pre>update = {html.escape(json.dumps(update_str, indent=2, ensure_ascii=False))}"
"</pre>\n\n"
f"<pre>context.chat_data = {html.escape(str(context.chat_data))}</pre>\n\n"
f"<pre>context.user_data = {html.escape(str(context.user_data))}</pre>\n\n"
f"<pre>{html.escape(tb_string)}</pre>"
)
logger.error(context.error)
logger.info(f"error in tgbot {context.error}\nReply update")
TgBotUpdater.my_queue.put(update)