to_inventory/back/tgbot/tgbot.py

263 lines
10 KiB
Python

import traceback
import html
import json
import logging
import re
import more_itertools as mit
from telegram import (
Update,
ReplyParameters,
ReplyKeyboardMarkup,
ReplyKeyboardRemove,
KeyboardButton,
)
from telegram.ext import (
ApplicationBuilder,
CommandHandler,
MessageHandler,
CallbackQueryHandler,
CallbackContext,
ContextTypes,
filters,
)
from telegram.constants import ParseMode
from django.conf import settings
from .apps import TgBotUpdater
from .models import Element, TgItem
from tmc.models import CustomTable
logger = logging.getLogger(__name__)
def chunk(n, l):
return [l[i : i + n] for i in range(0, len(l), n)]
class TgBot:
_app = None
def __init__(self) -> None:
self.token = settings.TGBOT["TOKEN"]
self.baseurl = settings.TGBOT["BASE_URL"]
self.webhook = settings.TGBOT["WEBHOOK_URL"]
if not self.token or not self.baseurl:
raise Exception("no token or baseurl")
TgBot.app = (
ApplicationBuilder().token(settings.TGBOT["TOKEN"]).updater(None).build()
)
async def set_webhook(self):
await TgBot.app.bot.setWebhook(
f"https://{self.baseurl}/{self.webhook}/", drop_pending_updates=True
)
async def start_app(self):
await TgBot.app.initialize()
async def admin_action(self, name, queryset):
pass
async def set_handlers(self):
TgBot.app.add_handler(CommandHandler("start", self.start, filters.ChatType.PRIVATE))
TgBot.app.add_handler(CommandHandler("my", self.my, filters.ChatType.PRIVATE))
TgBot.app.add_handler(CommandHandler("inv", self.inv, filters.ChatType.PRIVATE))
TgBot.app.add_handler(MessageHandler(filters.ChatType.PRIVATE, self.inv))
TgBot.app.add_error_handler(self.error_handler)
async def start(self, update: Update, context: CallbackContext):
await update.message.reply_markdown_v2(
(
"Это бот для проведения инвентаризации\n"
"/my \-\- продолжить инвентаризацию\n"
"/inv \-\- начать новую инвентаризацию"
),
# reply_markup=ForceReply(selective=True),
reply_parameters=ReplyParameters(message_id=update.message.message_id),
)
async def my(self, update: Update, context: CallbackContext):
user = update.effective_user
current_step = context.chat_data.get("step", None)
logger.info(f"Step {current_step} from user {user.full_name}")
inv = []
async for e in TgItem.objects.filter(user_id=user.id):
inv.append(e.name)
keys = chunk(1, inv)
await update.message.reply_markdown_v2(
("Ваши инвентаризации"),
reply_markup=ReplyKeyboardMarkup(
[[KeyboardButton(i) for i in arr] for arr in keys]
),
reply_parameters=ReplyParameters(message_id=update.message.message_id),
)
if len(inv):
context.chat_data["step"] = "get_inv"
async def inv(self, update: Update, context: CallbackContext):
user = update.effective_user
current_step = context.chat_data.get("step", None)
if not update.message:
return True
logger.info(update.message)
logger.info(f"Step {current_step} from user {user.full_name}")
if current_step == "get_inv":
inv = await TgItem.objects.aget(name=update.message.text)
context.chat_data["inv"] = inv.id
context.chat_data["step"] = "name"
current_step = context.chat_data["step"]
if not current_step and update.message.text == "/inv":
inv = await TgItem.objects.acreate(user_id=user.id)
await update.message.reply_markdown_v2(
(
f"Специалист {user.name or user.full_name}, ID {user.id}\n"
f"Начинаем инвентаризацию `#{inv.id}`\n"
f"Введите название объекта"
),
reply_parameters=ReplyParameters(message_id=update.message.message_id),
)
context.chat_data["inv"] = inv.id
context.chat_data["step"] = "name"
elif current_step == "name":
inv = await TgItem.objects.aget(id=context.chat_data["inv"])
inv.name = update.message.text
await inv.asave()
tmc = []
async for e in CustomTable.objects.all():
tmc.append(e.name)
keys = chunk(3, tmc)
await update.message.reply_markdown_v2(
(
f"Инвентаризация `#{inv.id}`\n"
f"Название объекта `{inv.name}`\n"
f"Количество ТМЦ {await inv.tmc.acount()}"
),
reply_parameters=ReplyParameters(message_id=update.message.message_id),
reply_markup=ReplyKeyboardMarkup(
[[KeyboardButton(i) for i in arr] for arr in keys]
),
)
logger.info(tmc)
context.chat_data["step"] = "add_tmc"
elif current_step == "add_tmc":
tmc_name = update.message.text
tmc = await CustomTable.objects.aget(name=tmc_name)
inv = await TgItem.objects.aget(id=context.chat_data["inv"])
await inv.tmc.aadd(tmc)
fields = []
async for e in tmc.fields.all():
fields.append(e.name)
keys = chunk(1, fields)
await update.message.reply_markdown_v2(
(
f"Инвентаризация `#{inv.id}`\n"
f"Название объекта `{inv.name}`\n"
f"Название ТМЦ `{tmc.name}`\n"
f"Что вы загружаете?"
),
reply_parameters=ReplyParameters(message_id=update.message.message_id),
reply_markup=ReplyKeyboardMarkup(
[[KeyboardButton(i) for i in arr] for arr in keys]
),
)
context.chat_data["tmc"] = tmc.id
context.chat_data["step"] = "add_field"
elif current_step == "add_field":
tmc = await CustomTable.objects.aget(id=context.chat_data["tmc"])
element_name = update.message.text
element = await Element.objects.acreate(name=element_name, tmc=tmc)
inv = await TgItem.objects.aget(id=context.chat_data["inv"])
await inv.element.aadd(element)
await update.message.reply_markdown_v2(
(
f"Инвентаризация `#{inv.id}`\n"
f"Название объекта `{inv.name}`\n"
f"Название ТМЦ `{tmc.name}`\n"
f"Название элемента `{element.name}`\n"
f"Загрузите фото или пришлите текст"
),
reply_parameters=ReplyParameters(message_id=update.message.message_id),
reply_markup=ReplyKeyboardRemove(),
)
context.chat_data["element"] = element.id
context.chat_data["step"] = "add_field_data"
elif current_step == "add_field_data":
inv = await TgItem.objects.aget(id=context.chat_data["inv"])
tmc = await CustomTable.objects.aget(id=inv.tmc)
element = await Element.objects.aget(id=inv.element)
if update.message.photo:
element.photoid = update.message.photo[-1].file_id
elif update.message.text:
element.text = update.message.text
await element.asave()
await update.message.reply_markdown_v2(
(
f"Инвентаризация `#{inv.id}`\n"
f"Название объекта `{inv.name}`\n"
f"Название ТМЦ `{tmc.name}`\n"
f"Название элемента `{element.name}`\n"
f"Данные загружены"
),
reply_parameters=ReplyParameters(message_id=update.message.message_id),
reply_markup=ReplyKeyboardRemove(),
)
context.chat_data["element"] = element.id
context.chat_data["step"] = "add_field_data"
else:
logger.info(update.message.entities)
logger.info(f"no step for update {update}")
if "step" in context.chat_data and context.chat_data["step"] == current_step:
context.chat_data["step"] = None
context.chat_data["inv"] = None
async def error_handler(
self, update: object, context: ContextTypes.DEFAULT_TYPE
) -> None:
"""Log the error and send a telegram message to notify the developer."""
# Log the error before we do anything else, so we can see it even if something breaks.
logger.error("Exception while handling an update:", exc_info=context.error)
# traceback.format_exception returns the usual python message about an exception, but as a
# list of strings rather than a single string, so we have to join them together.
tb_list = traceback.format_exception(
None, context.error, context.error.__traceback__
)
tb_string = "".join(tb_list)
# Build the message with some markup and additional information about what happened.
# You might need to add some logic to deal with messages longer than the 4096 character limit.
update_str = update.to_dict() if isinstance(update, Update) else str(update)
message = (
"An exception was raised while handling an update\n"
f"<pre>update = {html.escape(json.dumps(update_str, indent=2, ensure_ascii=False))}"
"</pre>\n\n"
f"<pre>context.chat_data = {html.escape(str(context.chat_data))}</pre>\n\n"
f"<pre>context.user_data = {html.escape(str(context.user_data))}</pre>\n\n"
f"<pre>{html.escape(tb_string)}</pre>"
)
logger.error(context.error)
logger.info(f"error in tgbot {context.error}\nReply update")
TgBotUpdater.my_queue.put(update)