to_inventory/back/tgbot/tgbot.py

559 lines
21 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import traceback
import html
import json
import logging
import re
import more_itertools as mit
from telegram import (
Update,
ReplyParameters,
ReplyKeyboardMarkup,
ReplyKeyboardRemove,
KeyboardButton,
InlineKeyboardMarkup,
InlineKeyboardButton,
)
from telegram.ext import (
ApplicationBuilder,
CommandHandler,
MessageHandler,
CallbackQueryHandler,
CallbackContext,
ContextTypes,
filters,
)
from telegram.constants import ParseMode
from django.conf import settings
from django.urls import reverse
from django.db import models
from .apps import TgBotUpdater
from .models import TgItem, TmcElement, TmcField
from tmc.models import CustomTable, BaseCustomField, Territory, TerritoryItem
logger = logging.getLogger(__name__)
def chunk(n, l):
return [l[i : i + n] for i in range(0, len(l), n)]
class TgBot:
_app = None
def __init__(self) -> None:
self.token = settings.TGBOT["TOKEN"]
self.baseurl = settings.TGBOT["BASE_URL"]
self.webhook = settings.TGBOT["WEBHOOK_URL"]
if not self.token or not self.baseurl:
raise Exception("no token or baseurl")
TgBot.app = (
ApplicationBuilder().token(settings.TGBOT["TOKEN"]).updater(None).build()
)
async def set_webhook(self):
await TgBot.app.bot.setWebhook(
f"https://{self.baseurl}{reverse('tgitem-send-tg-data')}",
drop_pending_updates=True,
)
async def start_app(self):
await TgBot.app.initialize()
async def admin_action(self, name, queryset):
if name == "admin_get_image":
item = queryset
try:
result = await TgBot.app.bot.get_file(item)
TgBotUpdater.return_values[item] = result.file_path
except Exception as e:
TgBotUpdater.return_values[item] = None
if name == "admin_get_name":
item = queryset
try:
result = await TgBot.app.bot.get_chat(item)
TgBotUpdater.return_values[item] = result.effective_name
except Exception as e:
TgBotUpdater.return_values[item] = None
async def set_handlers(self):
TgBot.app.add_handler(
CommandHandler("start", self.start, filters.ChatType.PRIVATE)
)
TgBot.app.add_handler(CommandHandler("my", self.my, filters.ChatType.PRIVATE))
TgBot.app.add_handler(CommandHandler("inv", self.inv, filters.ChatType.PRIVATE))
TgBot.app.add_handler(
MessageHandler(
(
filters.ChatType.PRIVATE
& ~filters.COMMAND
& (filters.TEXT | filters.PHOTO)
),
self.inv,
)
)
TgBot.app.add_handler(CommandHandler("ter", self.ter, filters.ChatType.PRIVATE))
TgBot.app.add_handler(CallbackQueryHandler(self.ter_back, "ter_back"))
TgBot.app.add_handler(CallbackQueryHandler(self.ter_next, "ter_next"))
TgBot.app.add_handler(
MessageHandler(
filters.ChatType.PRIVATE & filters.Regex(r"/ter_(\d.*)"),
self.terdeep,
)
)
TgBot.app.add_handler(CallbackQueryHandler(self.terdeep_back, "terdeep_back"))
TgBot.app.add_handler(CallbackQueryHandler(self.terdeep_next, "terdeep_next"))
TgBot.app.add_handler(CallbackQueryHandler(self.stop_inv, "stop_inv"))
TgBot.app.add_handler(CallbackQueryHandler(self.get_inv, r"get_inv@(.*?)"))
TgBot.app.add_handler(CallbackQueryHandler(self.add_tmc, r"add_tmc@(.*?)"))
TgBot.app.add_handler(
CallbackQueryHandler(self.add_element, r"add_element@(.*?)")
)
TgBot.app.add_error_handler(self.error_handler)
async def start(self, update: Update, context: CallbackContext):
await update.message.reply_markdown_v2(
(
"Это бот для проведения инвентаризации\n"
"/my \-\- продолжить инвентаризацию\n"
"/inv \-\- начать новую инвентаризацию"
"/ter \-\- список территорий"
),
# reply_markup=ForceReply(selective=True),
reply_parameters=ReplyParameters(message_id=update.message.message_id),
)
async def my(self, update: Update, context: CallbackContext):
user = update.effective_user
current_step = context.chat_data.get("step", None)
logger.info(f"Step {current_step} from user {user.full_name}")
inv = []
async for e in TgItem.objects.filter(user_id=user.id):
inv.append({"name": e.name, "id": str(e.id)})
keys = chunk(1, inv)
if len(inv) > 0:
await update.message.reply_markdown_v2(
("Ваши инвентаризации"),
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
i["name"],
callback_data=f'get_inv@{i["id"]}',
)
for i in arr
]
for arr in keys
]
),
reply_parameters=ReplyParameters(message_id=update.message.message_id),
)
else:
await update.message.reply_markdown_v2(
"У вас нет доступных для редактирования инвентаризаций"
)
def format_username(self, user):
return f"Специалист {user.name or user.full_name}, ID {user.id}"
def format_inv(self, inv):
return f"Инвентаризация `{inv.name}` от `{inv.created_at.strftime('%x')}`"
async def get_tmc_count(self, inv):
tmc_count = []
async for e in inv.tmc.values("tmc__name").annotate(
count=models.Count("tmc__name")
):
tmc_count.append(e)
return tmc_count
def format_tmc_count(self, tmc_count):
res = ["Всего ТМЦ:"]
res.append(", ".join([f"{e['tmc__name']} \({e['count']}\)" for e in tmc_count]))
return " ".join(res)
def format_tmc_name(self, tmc):
return f"Название ТМЦ `{tmc.name}`"
def format_element(self, field):
return f"Элемент `{field.name}`"
def stop_inv_button(self):
return [
InlineKeyboardButton(
"❌❌❌ Остановить инвентаризацию", callback_data="stop_inv"
)
]
async def stop_inv(self, update: Update, context: CallbackContext):
query = update.callback_query
await update.effective_message.edit_reply_markup(InlineKeyboardMarkup([]))
await query.answer()
context.chat_data.clear()
async def get_inv(self, update: Update, context: CallbackContext):
query = update.callback_query
await update.effective_message.edit_reply_markup(InlineKeyboardMarkup([]))
await query.answer()
inv_id = query.data.split("@")[-1]
inv = await TgItem.objects.aget(id=inv_id)
context.chat_data["inv"] = inv.id
context.chat_data["step"] = "name"
await self.inv(update, context)
async def add_tmc(self, update: Update, context: CallbackContext):
query = update.callback_query
if update.callback_query:
await update.effective_message.edit_reply_markup(InlineKeyboardMarkup([]))
await query.answer()
tmc_id = query.data.split("@")[-1]
tmc = await CustomTable.objects.aget(id=tmc_id)
tmc_element = await TmcElement.objects.acreate(tmc=tmc)
inv = await TgItem.objects.aget(id=context.chat_data["inv"])
async for f in tmc.fields.all():
tmc_field = await TmcField.objects.acreate(field=f)
await tmc_element.field.aadd(tmc_field)
await inv.tmc.aadd(tmc_element)
await inv.asave()
context.chat_data["tmc"] = tmc_element.id
context.chat_data["step"] = "add_tmc"
await self.inv(update, context)
async def add_element(self, update: Update, context: CallbackContext):
query = update.callback_query
if update.callback_query:
await update.effective_message.edit_reply_markup(InlineKeyboardMarkup([]))
await query.answer()
inv = await TgItem.objects.aget(id=context.chat_data["inv"])
tmc = await inv.tmc.aget(id=context.chat_data["tmc"])
field_id = query.data.split("@")[-1]
tmc_field = await tmc.field.aget(id=field_id)
context.chat_data["element"] = tmc_field.id
context.chat_data["step"] = "add_element"
await self.inv(update, context)
def set_step(self, update: Update, context: CallbackContext, prefix="ter"):
context.chat_data["step"] = f"{prefix}"
context.chat_data[f"{prefix}_start"] = 0
context.chat_data[f"{prefix}_count"] = 10
context.chat_data[f"{prefix}_renew"] = False
async def step_btn(
self, update: Update, context: CallbackContext, prefix="ter", step_type="plus"
):
query = update.callback_query
await query.answer()
step_start = context.chat_data[f"{prefix}_start"]
step_count = context.chat_data[f"{prefix}_count"]
context.chat_data[f"{prefix}_renew"] = True
if step_type == "plus":
context.chat_data[f"{prefix}_start"] += step_count
elif step_type == "minus":
context.chat_data[f"{prefix}_start"] -= step_count
func = getattr(self, prefix)
await func(update, context)
async def paged_data(
self, update: Update, context: CallbackContext, prefix="ter", queryset=[]
):
user = update.effective_user
current_step = context.chat_data.get("step", None)
logger.info(f"Step {current_step} from user {user.full_name}")
locations = []
async for e in queryset:
locations.append({"name": e.name, "id": e.id})
step_start = context.chat_data[f"{prefix}_start"]
step_step = context.chat_data[f"{prefix}_count"]
step_end = step_start + step_step
text = "\n".join(
[
self.format_username(user),
"Выберите объект:",
"\n".join(
[
f"/{prefix}_{i['id']} {i['name']}"
for i in locations[step_start:step_end]
]
),
]
)
keyboard = []
if step_start > 0:
keyboard.append(
InlineKeyboardButton(text="", callback_data=f"{prefix}_back")
)
if step_end < len(locations):
keyboard.append(
InlineKeyboardButton(text="", callback_data=f"{prefix}_next")
)
keyboard = [keyboard]
renew = context.chat_data.get(f"{prefix}_renew", None)
logger.info(update.message)
if renew:
await update.effective_message.edit_text(
text,
reply_markup=InlineKeyboardMarkup(inline_keyboard=keyboard),
)
else:
await update.effective_message.reply_text(
text,
reply_parameters=ReplyParameters(
message_id=update.effective_message.message_id
),
reply_markup=InlineKeyboardMarkup(inline_keyboard=keyboard),
)
async def ter_back(self, update: Update, context: CallbackContext):
await self.step_btn(update, context, "ter", "minus")
async def ter_next(self, update: Update, context: CallbackContext):
await self.step_btn(update, context, "ter", "plus")
async def ter(self, update: Update, context: CallbackContext):
if not update.callback_query:
self.set_step(update, context, "ter")
await self.paged_data(update, context, "ter", Territory.objects.all())
async def terdeep_back(self, update: Update, context: CallbackContext):
await self.step_btn(update, context, "terdeep", "minus")
async def terdeep_next(self, update: Update, context: CallbackContext):
await self.step_btn(update, context, "terdeep", "plus")
async def terdeep(self, update: Update, context: CallbackContext):
if not update.callback_query:
self.set_step(update, context, "terdeep")
parent = context.chat_data.get("terdeep", None)
if len(re.findall(r"/ter_(\d.*)", update.effective_message.text)):
parent = re.findall(r"/ter_(\d.*)", update.effective_message.text)[0]
context.chat_data["terdeep"] = parent
context.chat_data["terdeep_renew"] = False
await self.paged_data(
update, context, "terdeep", TerritoryItem.objects.filter(parent_id=parent)
)
async def inv(self, update: Update, context: CallbackContext):
user = update.effective_user
current_step = context.chat_data.get("step", None)
logger.info(f"Step {current_step} from user {user.full_name}")
if update.effective_message.text == "/inv":
context.chat_data["step"] = "name"
await update.effective_message.reply_markdown_v2(
"\n".join([self.format_username(user), f"Введите название объекта"]),
reply_parameters=ReplyParameters(
message_id=update.effective_message.message_id
),
)
if current_step == "name":
if not context.chat_data.get("inv", None):
loc = await TerritoryItem.objects.aget(id=35)
inv = await TgItem.objects.acreate(user_id=user.id, location=loc)
inv.name = update.message.text
await inv.asave()
else:
inv = await TgItem.objects.aget(id=context.chat_data["inv"])
tmc = []
async for e in CustomTable.objects.all():
tmc.append({"name": e.name, "id": e.id})
keys = chunk(2, tmc)
context.chat_data["inv"] = inv.id
context.chat_data["step"] = "add_tmc"
text = "\n".join(
[
self.format_inv(inv),
self.format_tmc_count(await self.get_tmc_count(inv)),
"Выберите, какую ТМЦ вы осматриваете:",
]
)
keyboard = [
[
InlineKeyboardButton(
i["name"],
callback_data=f'{context.chat_data["step"]}@{i["id"]}',
)
for i in arr
]
for arr in keys
]
keyboard.append(self.stop_inv_button())
await update.effective_message.reply_markdown_v2(
text,
reply_parameters=ReplyParameters(
message_id=update.effective_message.message_id
),
reply_markup=InlineKeyboardMarkup(keyboard),
)
elif current_step == "add_tmc":
inv = await TgItem.objects.aget(id=context.chat_data["inv"])
tmc_element = await inv.tmc.aget(id=context.chat_data["tmc"])
tmc = await CustomTable.objects.aget(id=tmc_element.tmc_id)
fields = []
async for e in tmc_element.field.filter(text=None, file_id=None):
f = await BaseCustomField.objects.aget(id=e.field_id)
fields.append({"name": f.name, "id": e.id})
keys = chunk(1, fields)
context.chat_data["tmc"] = tmc_element.id
context.chat_data["step"] = "add_element"
text = "\n".join(
[
self.format_inv(inv),
self.format_tmc_count(await self.get_tmc_count(inv)),
self.format_tmc_name(tmc),
f"Что вы загружаете?",
]
)
keyboard = [
[
InlineKeyboardButton(
i["name"],
callback_data=f'{context.chat_data["step"]}@{i["id"]}',
)
for i in arr
]
for arr in keys
]
keyboard.append(self.stop_inv_button())
await update.effective_message.reply_markdown_v2(
text,
reply_parameters=ReplyParameters(
message_id=update.effective_message.message_id
),
reply_markup=InlineKeyboardMarkup(keyboard),
)
elif current_step == "add_element":
inv = await TgItem.objects.aget(id=context.chat_data["inv"])
tmc_element = await inv.tmc.aget(id=context.chat_data["tmc"])
tmc = await CustomTable.objects.aget(id=tmc_element.tmc_id)
element = await TmcField.objects.aget(id=context.chat_data["element"])
field = await BaseCustomField.objects.aget(id=element.field_id)
context.chat_data["step"] = "add_element_data"
await update.effective_message.reply_markdown_v2(
"\n".join(
[
self.format_inv(inv),
self.format_tmc_count(await self.get_tmc_count(inv)),
self.format_tmc_name(tmc),
self.format_element(field),
f"Загрузите фото или пришлите текст",
]
),
reply_parameters=ReplyParameters(
message_id=update.effective_message.message_id
),
reply_markup=ReplyKeyboardRemove(),
)
elif current_step == "add_element_data":
inv = await TgItem.objects.aget(id=context.chat_data["inv"])
tmc_element = await inv.tmc.aget(id=context.chat_data["tmc"])
tmc = await CustomTable.objects.aget(id=tmc_element.tmc_id)
element = await TmcField.objects.aget(id=context.chat_data["element"])
field = await BaseCustomField.objects.aget(id=element.field_id)
if update.message.photo:
element.file_id = update.message.photo[-1].file_id
elif update.message.text:
element.text = update.message.text
await element.asave()
await inv.asave()
await update.effective_message.reply_markdown_v2(
"Изображение сохранено" if update.message.photo else "Текст сохранен"
)
empty_fields = await tmc_element.field.filter(
text=None, file_id=None
).acount()
if empty_fields > 0:
context.chat_data["step"] = "add_tmc"
await self.inv(update, context)
else:
context.chat_data["step"] = "name"
await self.inv(update, context)
else:
# logger.info(update.message.entities)
logger.info(f"no step for update {update}")
if "step" in context.chat_data and context.chat_data["step"] == current_step:
context.chat_data["step"] = None
context.chat_data["inv"] = None
async def error_handler(
self, update: object, context: ContextTypes.DEFAULT_TYPE
) -> None:
"""Log the error and send a telegram message to notify the developer."""
# Log the error before we do anything else, so we can see it even if something breaks.
# logger.error("Exception while handling an update:", exc_info=context.error)
# traceback.format_exception returns the usual python message about an exception, but as a
# list of strings rather than a single string, so we have to join them together.
tb_list = traceback.format_exception(
None, context.error, context.error.__traceback__
)
tb_string = "".join(tb_list)
# Build the message with some markup and additional information about what happened.
# You might need to add some logic to deal with messages longer than the 4096 character limit.
update_str = update.to_dict() if isinstance(update, Update) else str(update)
message = (
"An exception was raised while handling an update\n"
f"<pre>update = {html.escape(json.dumps(update_str, indent=2, ensure_ascii=False))}"
"</pre>\n\n"
f"<pre>context.chat_data = {html.escape(str(context.chat_data))}</pre>\n\n"
f"<pre>context.user_data = {html.escape(str(context.user_data))}</pre>\n\n"
f"<pre>{html.escape(tb_string)}</pre>"
)
# logger.error(context.error)
logger.info(f"error in tgbot {context.error}\n{tb_string}\nReply update")