import traceback
import html
import json
import logging
import re
import more_itertools as mit
from telegram import (
Update,
ReplyParameters,
ReplyKeyboardMarkup,
ReplyKeyboardRemove,
KeyboardButton,
InlineKeyboardMarkup,
InlineKeyboardButton,
)
from telegram.ext import (
ApplicationBuilder,
CommandHandler,
MessageHandler,
CallbackQueryHandler,
CallbackContext,
ContextTypes,
filters,
)
from telegram.constants import ParseMode
from django.conf import settings
from django.urls import reverse
from django.db import models
from .models import TgItem, TmcElement, TmcField
from .tasks import upload_file
from tmc.models import CustomTable, BaseCustomField, Territory, TerritoryItem
logger = logging.getLogger(__name__)
def chunk(n, l):
return [l[i : i + n] for i in range(0, len(l), n)]
class TgBot:
_app = None
def __init__(self) -> None:
self.token = settings.TGBOT["TOKEN"]
self.baseurl = settings.TGBOT["BASE_URL"]
self.webhook = settings.TGBOT["WEBHOOK_URL"]
if not self.token or not self.baseurl:
raise Exception("no token or baseurl")
TgBot.app = (
ApplicationBuilder().token(settings.TGBOT["TOKEN"]).updater(None).build()
)
async def set_webhook(self):
await TgBot.app.bot.setWebhook(
f"https://{self.baseurl}{reverse('tgitem-send-tg-data')}",
drop_pending_updates=True,
)
async def start_app(self):
await TgBot.app.initialize()
async def admin_action(self, name, queryset):
from .updater import tg_bot_updater_instance
if name == "admin_get_image":
item = queryset
try:
result = await TgBot.app.bot.get_file(item)
tg_bot_updater_instance.return_values[item] = result.file_path
except Exception as e:
tg_bot_updater_instance.return_values[item] = None
if name == "admin_get_name":
item = queryset
try:
result = await TgBot.app.bot.get_chat(item)
tg_bot_updater_instance.return_values[item] = result.effective_name
except Exception as e:
tg_bot_updater_instance.return_values[item] = None
async def set_handlers(self):
TgBot.app.add_handler(
CommandHandler("start", self.start, filters.ChatType.PRIVATE)
)
TgBot.app.add_handler(CommandHandler("my", self.my, filters.ChatType.PRIVATE))
TgBot.app.add_handler(CommandHandler("inv", self.inv, filters.ChatType.PRIVATE))
TgBot.app.add_handler(
MessageHandler(
(
filters.ChatType.PRIVATE
& ~filters.COMMAND
& (filters.TEXT | filters.PHOTO)
),
self.inv,
)
)
TgBot.app.add_handler(CommandHandler("ter", self.ter, filters.ChatType.PRIVATE))
TgBot.app.add_handler(CallbackQueryHandler(self.ter_back, "ter_back"))
TgBot.app.add_handler(CallbackQueryHandler(self.ter_next, "ter_next"))
TgBot.app.add_handler(
MessageHandler(
filters.ChatType.PRIVATE & filters.Regex(r"/ter_(\d.*)"),
self.terdeep,
)
)
TgBot.app.add_handler(CallbackQueryHandler(self.terdeep_back, "terdeep_back"))
TgBot.app.add_handler(CallbackQueryHandler(self.terdeep_next, "terdeep_next"))
TgBot.app.add_handler(
MessageHandler(
filters.ChatType.PRIVATE & filters.Regex(r"/terdeep_(\d.*)"),
self.terdeep_set,
)
)
TgBot.app.add_handler(CallbackQueryHandler(self.stop_inv, "stop_inv"))
TgBot.app.add_handler(CallbackQueryHandler(self.get_inv, r"get_inv@(.*?)"))
TgBot.app.add_handler(CallbackQueryHandler(self.add_tmc, r"add_tmc@(.*?)"))
TgBot.app.add_handler(
CallbackQueryHandler(self.add_element, r"add_element@(.*?)")
)
TgBot.app.add_error_handler(self.error_handler)
async def start(self, update: Update, context: CallbackContext):
await update.effective_message.reply_html(
(
"Это бот для проведения инвентаризации\n"
"/ter — список территорий\n"
"/inv — начать новую инвентаризацию\n"
"/my — продолжить инвентаризацию\n"
),
# reply_markup=ForceReply(selective=True),
reply_parameters=ReplyParameters(
message_id=update.effective_message.message_id
),
)
async def my(self, update: Update, context: CallbackContext):
user = update.effective_user
current_step = context.chat_data.get("step", None)
logger.info(f"Step {current_step} from user {user.full_name} with data {context.chat_data}")
inv = []
async for e in TgItem.objects.filter(user_id=user.id):
inv.append({"name": e.name, "id": str(e.id)})
keys = chunk(1, inv)
if len(inv) > 0:
await update.message.reply_html(
("Ваши инвентаризации"),
reply_markup=InlineKeyboardMarkup(
[
[
InlineKeyboardButton(
i["name"],
callback_data=f'get_inv@{i["id"]}',
)
for i in arr
]
for arr in keys
]
),
reply_parameters=ReplyParameters(message_id=update.message.message_id),
)
else:
await update.message.reply_html(
"У вас нет доступных для редактирования инвентаризаций"
)
def format_username(self, user):
return f"Специалист {user.name or user.full_name}, ID {user.id}
"
def format_inv(self, inv):
return f"Инвентаризация {inv.name} от {inv.created_at.strftime('%x')}
"
async def get_tmc_count(self, inv):
tmc_count = []
async for e in inv.tmc.values("tmc__name").annotate(
count=models.Count("tmc__name")
):
tmc_count.append(e)
return tmc_count
def format_tmc_count(self, tmc_count):
res = ["Всего ТМЦ:"]
res.append(", ".join([f"{e['tmc__name']} ({e['count']})" for e in tmc_count]))
return " ".join(res)
def format_tmc_name(self, tmc):
return f"Название ТМЦ {tmc.name}
"
def format_element(self, field):
return f"Элемент {field.name}
"
def stop_inv_button(self):
return [
InlineKeyboardButton(
"❌❌❌ Остановить инвентаризацию", callback_data="stop_inv"
)
]
async def stop_inv(self, update: Update, context: CallbackContext):
query = update.callback_query
await update.effective_message.edit_reply_markup(InlineKeyboardMarkup([]))
await query.answer()
context.chat_data.clear()
await self.start(update, context)
async def get_inv(self, update: Update, context: CallbackContext):
query = update.callback_query
await update.effective_message.edit_reply_markup(InlineKeyboardMarkup([]))
await query.answer()
inv_id = query.data.split("@")[-1]
inv = await TgItem.objects.aget(id=inv_id)
context.chat_data["terdeep_value"] = inv.location_id
context.chat_data["inv"] = inv.id
context.chat_data["step"] = "name"
await self.inv(update, context)
async def add_tmc(self, update: Update, context: CallbackContext):
query = update.callback_query
if update.callback_query:
await update.effective_message.edit_reply_markup(InlineKeyboardMarkup([]))
await query.answer()
tmc_id = query.data.split("@")[-1]
tmc = await CustomTable.objects.aget(id=tmc_id)
tmc_element = await TmcElement.objects.acreate(tmc=tmc)
inv = await TgItem.objects.aget(id=context.chat_data["inv"])
async for f in tmc.fields.all():
tmc_field = await TmcField.objects.acreate(field=f)
await tmc_element.field.aadd(tmc_field)
await inv.tmc.aadd(tmc_element)
await inv.asave()
context.chat_data["tmc"] = tmc_element.id
context.chat_data["step"] = "add_tmc"
await self.inv(update, context)
async def add_element(self, update: Update, context: CallbackContext):
query = update.callback_query
if update.callback_query:
await update.effective_message.edit_reply_markup(InlineKeyboardMarkup([]))
await query.answer()
inv = await TgItem.objects.aget(id=context.chat_data["inv"])
tmc = await inv.tmc.aget(id=context.chat_data["tmc"])
field_id = query.data.split("@")[-1]
tmc_field = await tmc.field.aget(id=field_id)
context.chat_data["element"] = tmc_field.id
context.chat_data["step"] = "add_element"
await self.inv(update, context)
def set_step(self, update: Update, context: CallbackContext, prefix="ter"):
context.chat_data["step"] = f"{prefix}"
context.chat_data[f"{prefix}_start"] = 0
context.chat_data[f"{prefix}_count"] = 10
context.chat_data[f"{prefix}_renew"] = False
async def step_btn(
self, update: Update, context: CallbackContext, prefix="ter", step_type="plus"
):
query = update.callback_query
await query.answer()
step_start = context.chat_data[f"{prefix}_start"]
step_count = context.chat_data[f"{prefix}_count"]
context.chat_data[f"{prefix}_renew"] = True
if step_type == "plus":
context.chat_data[f"{prefix}_start"] += step_count
elif step_type == "minus":
context.chat_data[f"{prefix}_start"] -= step_count
func = getattr(self, prefix)
await func(update, context)
async def paged_data(
self, update: Update, context: CallbackContext, prefix="ter", queryset=[]
):
user = update.effective_user
current_step = context.chat_data.get("step", None)
logger.info(f"Step {current_step} from user {user.full_name} with data {context.chat_data}")
locations = []
async for e in queryset:
locations.append({"name": e.name, "id": e.id})
step_start = context.chat_data[f"{prefix}_start"]
step_step = context.chat_data[f"{prefix}_count"]
step_end = step_start + step_step
text = "\n".join(
[
self.format_username(user),
"Выберите объект:",
"\n".join(
[
f"/{prefix}_{i['id']} {i['name']}"
for i in locations[step_start:step_end]
]
),
]
)
keyboard = []
if step_start > 0:
keyboard.append(
InlineKeyboardButton(text="←", callback_data=f"{prefix}_back")
)
if step_end < len(locations):
keyboard.append(
InlineKeyboardButton(text="→", callback_data=f"{prefix}_next")
)
keyboard = [keyboard]
renew = context.chat_data.get(f"{prefix}_renew", None)
logger.info(update.message)
if renew:
await update.effective_message.edit_text(
text,
reply_markup=InlineKeyboardMarkup(inline_keyboard=keyboard),
parse_mode=ParseMode.HTML,
)
else:
await update.effective_message.reply_html(
text,
reply_parameters=ReplyParameters(
message_id=update.effective_message.message_id
),
reply_markup=InlineKeyboardMarkup(inline_keyboard=keyboard),
)
async def ter_back(self, update: Update, context: CallbackContext):
await self.step_btn(update, context, "ter", "minus")
async def ter_next(self, update: Update, context: CallbackContext):
await self.step_btn(update, context, "ter", "plus")
async def ter(self, update: Update, context: CallbackContext):
if not update.callback_query:
self.set_step(update, context, "ter")
await self.paged_data(update, context, "ter", Territory.objects.all())
async def terdeep_back(self, update: Update, context: CallbackContext):
await self.step_btn(update, context, "terdeep", "minus")
async def terdeep_next(self, update: Update, context: CallbackContext):
await self.step_btn(update, context, "terdeep", "plus")
async def terdeep(self, update: Update, context: CallbackContext):
if not update.callback_query:
self.set_step(update, context, "terdeep")
parent = context.chat_data.get("terdeep", None)
if len(re.findall(r"/ter_(\d.*)", update.effective_message.text)):
parent = re.findall(r"/ter_(\d.*)", update.effective_message.text)[0]
context.chat_data["terdeep"] = parent
context.chat_data["terdeep_renew"] = False
await self.paged_data(
update, context, "terdeep", TerritoryItem.objects.filter(parent_id=parent)
)
async def terdeep_set(self, update: Update, context: CallbackContext):
context.chat_data["terdeep_value"] = re.findall(
r"/terdeep_(\d.*)", update.effective_message.text
)[0]
text = (
"Вы выбрали территорию инвентаризации\n"
"Теперь вы можете начать инвентаризацию /inv"
)
await update.effective_message.reply_html(
text=text,
reply_parameters=ReplyParameters(
message_id=update.effective_message.message_id
),
)
async def inv(self, update: Update, context: CallbackContext):
user = update.effective_user
current_step = context.chat_data.get("step", None)
logger.info(f"Step {current_step} from user {user.full_name} with data {context.chat_data}")
if not context.chat_data.get("terdeep_value", None):
await update.effective_message.reply_html(
text=("Вы не выбрали территорию /ter"),
reply_parameters=ReplyParameters(
message_id=update.effective_message.message_id
),
)
return
if update.effective_message.text == "/inv":
del context.chat_data["inv"]
context.chat_data["step"] = "name"
current_ter_id = context.chat_data.get("terdeep_value", None)
current_ter = await TerritoryItem.objects.aget(id=current_ter_id)
await update.effective_message.reply_html(
"\n".join([self.format_username(user), f"Введите название объекта"]),
reply_parameters=ReplyParameters(
message_id=update.effective_message.message_id
),
reply_markup=ReplyKeyboardMarkup([[KeyboardButton(current_ter.name)]]),
)
if current_step == "name":
if not context.chat_data.get("inv", None):
current_ter_id = context.chat_data.get("terdeep_value", None)
current_ter = await TerritoryItem.objects.aget(id=current_ter_id)
inv = await TgItem.objects.acreate(
user_id=user.id, location=current_ter
)
inv.name = update.message.text
await inv.asave()
await update.effective_message.reply_html(
f"Ок, сохранено название {inv.name}",
reply_markup=ReplyKeyboardRemove(),
)
# await res.delete()
else:
inv = await TgItem.objects.aget(id=context.chat_data["inv"])
tmc = []
async for e in CustomTable.objects.all():
tmc.append({"name": e.name, "id": e.id})
keys = chunk(2, tmc)
context.chat_data["inv"] = inv.id
context.chat_data["step"] = "add_tmc"
text = "\n".join(
[
self.format_inv(inv),
self.format_tmc_count(await self.get_tmc_count(inv)),
"Выберите, какую ТМЦ вы осматриваете:",
]
)
keyboard = [
[
InlineKeyboardButton(
i["name"],
callback_data=f'{context.chat_data["step"]}@{i["id"]}',
)
for i in arr
]
for arr in keys
]
keyboard.append(self.stop_inv_button())
await update.effective_message.reply_html(
text,
reply_parameters=ReplyParameters(
message_id=update.effective_message.message_id
),
reply_markup=InlineKeyboardMarkup(keyboard),
)
elif current_step == "add_tmc":
inv = await TgItem.objects.aget(id=context.chat_data["inv"])
tmc_element = await inv.tmc.aget(id=context.chat_data["tmc"])
tmc = await CustomTable.objects.aget(id=tmc_element.tmc_id)
fields = []
async for e in tmc_element.field.filter(text=None, file_id=None):
f = await BaseCustomField.objects.aget(id=e.field_id)
fields.append({"name": f.name, "id": e.id})
keys = chunk(1, fields)
context.chat_data["tmc"] = tmc_element.id
context.chat_data["step"] = "add_element"
text = "\n".join(
[
self.format_inv(inv),
self.format_tmc_count(await self.get_tmc_count(inv)),
self.format_tmc_name(tmc),
f"Что вы загружаете?",
]
)
keyboard = [
[
InlineKeyboardButton(
i["name"],
callback_data=f'{context.chat_data["step"]}@{i["id"]}',
)
for i in arr
]
for arr in keys
]
keyboard.append(self.stop_inv_button())
await update.effective_message.reply_html(
text,
reply_parameters=ReplyParameters(
message_id=update.effective_message.message_id
),
reply_markup=InlineKeyboardMarkup(keyboard),
)
elif current_step == "add_element":
inv = await TgItem.objects.aget(id=context.chat_data["inv"])
tmc_element = await inv.tmc.aget(id=context.chat_data["tmc"])
tmc = await CustomTable.objects.aget(id=tmc_element.tmc_id)
element = await TmcField.objects.aget(id=context.chat_data["element"])
field = await BaseCustomField.objects.aget(id=element.field_id)
context.chat_data["step"] = "add_element_data"
await update.effective_message.reply_html(
"\n".join(
[
self.format_inv(inv),
self.format_tmc_count(await self.get_tmc_count(inv)),
self.format_tmc_name(tmc),
self.format_element(field),
f"Загрузите фото или пришлите текст",
]
),
reply_parameters=ReplyParameters(
message_id=update.effective_message.message_id
),
reply_markup=ReplyKeyboardRemove(),
)
elif current_step == "add_element_data":
inv = await TgItem.objects.aget(id=context.chat_data["inv"])
tmc_element = await inv.tmc.aget(id=context.chat_data["tmc"])
tmc = await CustomTable.objects.aget(id=tmc_element.tmc_id)
element = await TmcField.objects.aget(id=context.chat_data["element"])
field = await BaseCustomField.objects.aget(id=element.field_id)
if update.message.photo:
element.file_id = update.message.photo[-1].file_id
upload_file.delay(element.file_id)
if update.message.caption:
element.text = update.message.caption
if update.message.text:
element.text = update.message.text
await element.asave()
await inv.asave()
await update.effective_message.reply_html(
"Изображение сохранено" if update.message.photo else "Текст сохранен"
)
empty_fields = await tmc_element.field.filter(
text=None, file_id=None
).acount()
if empty_fields > 0:
context.chat_data["step"] = "add_tmc"
await self.inv(update, context)
else:
context.chat_data["step"] = "name"
await self.inv(update, context)
else:
# logger.info(update.message.entities)
logger.info(f"no step for update {update}")
if "step" in context.chat_data and context.chat_data["step"] == current_step:
context.chat_data["step"] = None
context.chat_data["inv"] = None
async def error_handler(
self, update: object, context: ContextTypes.DEFAULT_TYPE
) -> None:
"""Log the error and send a telegram message to notify the developer."""
# Log the error before we do anything else, so we can see it even if something breaks.
# logger.error("Exception while handling an update:", exc_info=context.error)
# traceback.format_exception returns the usual python message about an exception, but as a
# list of strings rather than a single string, so we have to join them together.
tb_list = traceback.format_exception(
None, context.error, context.error.__traceback__
)
tb_string = "".join(tb_list)
# Build the message with some markup and additional information about what happened.
# You might need to add some logic to deal with messages longer than the 4096 character limit.
update_str = update.to_dict() if isinstance(update, Update) else str(update)
message = (
"An exception was raised while handling an update\n"
f"
update = {html.escape(json.dumps(update_str, indent=2, ensure_ascii=False))}" "\n\n" f"
context.chat_data = {html.escape(str(context.chat_data))}\n\n" f"
context.user_data = {html.escape(str(context.user_data))}\n\n" f"
{html.escape(tb_string)}" ) # logger.error(context.error) logger.info(f"error in tgbot {context.error}\n{tb_string}\nReply update")