base tg updater

This commit is contained in:
Kseninia Mikhaylova 2024-07-16 16:25:09 +03:00
parent 1b310dbfbe
commit 2916826bfb
10 changed files with 192 additions and 133 deletions

View File

@ -27,7 +27,8 @@ SECRET_KEY = "django-insecure-ruo!wst&sb8(f9)j5u4rda-w!673lj_-c0a%gx_t@)ff*q*2ze
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True
NGROK_TEMP = "da22-193-228-134-167.ngrok-free.app"
NGROK_TEMP = os.environ.get("NGROK_TEMP")
ALLOWED_HOSTS = [
"localhost",
NGROK_TEMP,
@ -187,7 +188,7 @@ LOGGING = {
ODATA_AUTH = os.environ.get("ODATA_AUTH")
TGBOT = {
"token": os.environ.get("TG_TOKEN"),
"base_url": NGROK_TEMP,
"webhook": "webhook",
"TOKEN": os.environ.get("TG_TOKEN"),
"BASE_URL": NGROK_TEMP,
"WEBHOOK_URL": f"api/tgbot/webhook/{os.environ.get('TG_TOKEN')}",
}

View File

@ -14,6 +14,7 @@ Including another URLconf
1. Import the include() function: from django.urls import include, path
2. Add a URL to urlpatterns: path('blog/', include('blog.urls'))
"""
from django.conf import settings
from django.contrib import admin
from django.urls import include, path
from rest_framework import routers
@ -26,9 +27,9 @@ router = routers.DefaultRouter()
router.register(r'api/partner', views.PartnerViewSet)
router.register(r'api/element', views.ElementViewSet)
router.register(r'api/inventory', views.InventoryItemViewSet)
router.register(r'api/tgbot', tgbot_views.ItemViewSet)
router.register(r'api/tmc/fields', tmc_views.BaseCustomFieldViewSet)
router.register(r'api/tmc/items', tmc_views.CustomTableViewSet)
router.register(settings.TGBOT['WEBHOOK_URL'], tgbot_views.TgItemViewSet)
urlpatterns = [
path('', include(router.urls)),

13
back/poetry.lock generated
View File

@ -347,6 +347,17 @@ files = [
docs = ["mdx-gh-links (>=0.2)", "mkdocs (>=1.5)", "mkdocs-gen-files", "mkdocs-literate-nav", "mkdocs-nature (>=0.6)", "mkdocs-section-index", "mkdocstrings[python]"]
testing = ["coverage", "pyyaml"]
[[package]]
name = "more-itertools"
version = "10.3.0"
description = "More routines for operating on iterables, beyond itertools"
optional = false
python-versions = ">=3.8"
files = [
{file = "more-itertools-10.3.0.tar.gz", hash = "sha256:e5d93ef411224fbcef366a6e8ddc4c5781bc6359d43412a65dd5964e46111463"},
{file = "more_itertools-10.3.0-py3-none-any.whl", hash = "sha256:ea6a02e24a9161e51faad17a8782b92a0df82c12c1c8886fec7f0c3fa1a1b320"},
]
[[package]]
name = "mslex"
version = "1.2.0"
@ -750,4 +761,4 @@ zstd = ["zstandard (>=0.18.0)"]
[metadata]
lock-version = "2.0"
python-versions = "^3.10"
content-hash = "b0a8df55d3a7a429a89e692c2f74d3f41144a44fb1da3b108105ac9353eab7db"
content-hash = "aa6963a661c46f7e6fe01a15a736d3c401ddbf263dd19bfed36a8c003199a1c0"

View File

@ -17,6 +17,7 @@ python-dotenv = "^1.0.1"
requests = "^2.32.2"
django-cors-headers = "^4.3.1"
python-telegram-bot = {extras = ["job-queue"], version = "^21.3"}
more-itertools = "^10.3.0"
[tool.poetry.group.dev.dependencies]
taskipy = "^1.12.2"

View File

@ -1,6 +1,6 @@
from django.contrib import admin
from .models import Item, Element
from .models import TgItem, Element
# Register your models here.
admin.site.register(Item)
admin.site.register(TgItem)
admin.site.register(Element)

View File

@ -1,59 +1,88 @@
from asgiref.sync import async_to_sync
from django.apps import AppConfig
import asyncio
import threading
import queue
import time
import os
class TgBotClass(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "tgbot"
class TgBotUpdater:
is_run = False
app = None
tgbot_class = None
update_queue = None
my_queue = queue.Queue()
# @async_to_sync
async def init_bot(self):
from django.conf import settings
from .tgbot import TgBotApp
import logging
tgbot = TgBotApp()
app = await tgbot.init_tg()
await tgbot.set_webhook(
f"https://{settings.TGBOT['base_url']}/api/tgbot/",
logger = logging.getLogger(__name__)
def __init__(self) -> None:
self.loop = asyncio.get_event_loop()
try:
from .tgbot import TgBot
self.tgbot_class = TgBot()
except Exception as e:
self.logger.error(e)
TgBotUpdater.is_run = True
async def _set_webhook(self):
await self.tgbot_class.set_webhook()
async def _start_app(self):
await self.tgbot_class.start_app()
async def _set_hadlers(self):
await self.tgbot_class.set_handlers()
async def _run_func(self):
from .tgbot import TgBot
while hasattr(TgBot, "app"):
# self.logger.info(f"check updates in {await TgBot.app.bot.get_webhook_info()}")
if not TgBotUpdater.my_queue.empty():
item = TgBotUpdater.my_queue.get()
if (
isinstance(item, dict)
and "name" in item
and item["name"].startswith("admin_")
):
await self.tgbot_class.admin_action(item["name"], item["queryset"])
else:
try:
await TgBot.app.process_update(item)
except Exception as e:
print(f"Error in tg thread {e}")
await TgBot.app.process_update(item)
TgBotUpdater.my_queue.task_done()
await asyncio.sleep(3)
async def main(self):
await asyncio.gather(
self._set_webhook(),
self._set_hadlers(),
self._start_app(),
self._run_func(),
)
return app
async def some_function(self=None):
while True:
if not TgBotClass.my_queue.empty():
item = TgBotClass.my_queue.get()
await TgBotClass.app.process_update(item)
TgBotClass.my_queue.task_done()
time.sleep(1)
def run_func(self):
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self.main())
self.loop.close()
async def some_callback():
await TgBotClass.some_function()
def between_callback():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(TgBotClass.some_callback())
loop.close()
class TgbotConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "tgbot"
def ready(self):
import os
if not TgBotUpdater.is_run and os.environ.get("RUN_MAIN", None) == "true":
threading.Thread(
target=(TgBotUpdater().run_func),
name="tg_updater_thread",
daemon=True,
).start()
return super().ready()
if os.environ.get("RUN_MAIN", None) != "true":
return
if TgBotClass.is_run:
return
TgBotClass.is_run = True
TgBotClass.app = async_to_sync(self.init_bot, force_new_loop=True)()
thread = threading.Thread(target=TgBotClass.between_callback)
thread.setDaemon(True)
thread.start()

View File

@ -25,7 +25,7 @@ class Element(models.Model):
def __str__(self):
return f"Element {self.tmc} {self.name}"
class Item(models.Model):
class TgItem(models.Model):
id = models.UUIDField(
auto_created=True,
primary_key=True,

View File

@ -1,12 +1,12 @@
from rest_framework import serializers
from .models import Item
from .models import TgItem
import logging
logger = logging.getLogger("root")
class ItemSerializer(serializers.ModelSerializer):
class TgItemSerializer(serializers.ModelSerializer):
class Meta:
model = Item
model = TgItem
fields = '__all__'

View File

@ -1,41 +1,73 @@
from django.conf import settings
from asgiref.sync import sync_to_async
import traceback
import html
import json
import logging
import re
import more_itertools as mit
from telegram import (
ForceReply,
Update,
ReplyParameters,
ReplyKeyboardMarkup,
ReplyKeyboardRemove,
KeyboardButton,
InlineKeyboardMarkup,
InlineKeyboardButton,
MessageEntity,
)
from telegram.ext import (
Application,
ApplicationBuilder,
CommandHandler,
MessageHandler,
filters,
CallbackQueryHandler,
CallbackContext,
ContextTypes,
filters,
)
from telegram.constants import ParseMode, ChatType, MessageEntityType
from telegram.constants import ParseMode
from .apps import TgBotClass
from .models import Element, Item
from tmc.models import CustomTable, BaseCustomField
from django.conf import settings
import logging
from .apps import TgBotUpdater
from .models import Element, TgItem
from tmc.models import CustomTable
logger = logging.getLogger("root")
logger = logging.getLogger(__name__)
def chunk(n, l):
return [l[i : i + n] for i in range(0, len(l), n)]
class TgBotApp:
class TgBot:
_app = None
def __init__(self) -> None:
self.token = settings.TGBOT["TOKEN"]
self.baseurl = settings.TGBOT["BASE_URL"]
self.webhook = settings.TGBOT["WEBHOOK_URL"]
if not self.token or not self.baseurl:
raise Exception("no token or baseurl")
TgBot.app = (
ApplicationBuilder().token(settings.TGBOT["TOKEN"]).updater(None).build()
)
async def set_webhook(self):
await TgBot.app.bot.setWebhook(
f"https://{self.baseurl}/{self.webhook}/", drop_pending_updates=True
)
async def start_app(self):
await TgBot.app.initialize()
async def admin_action(self, name, queryset):
pass
async def set_handlers(self):
TgBot.app.add_handler(CommandHandler("my", self.my, filters.ChatType.PRIVATE))
TgBot.app.add_handler(CommandHandler("inv", self.inv, filters.ChatType.PRIVATE))
TgBot.app.add_handler(MessageHandler(filters.ChatType.PRIVATE, self.inv))
TgBot.app.add_error_handler(self.error_handler)
async def start(self, update: Update, context: CallbackContext):
await update.message.reply_markdown_v2(
(
@ -75,13 +107,13 @@ class TgBotApp:
logger.info(f"Step {current_step} from user {user.full_name}")
if current_step == "get_inv":
inv = await Item.objects.aget(name=update.message.text)
inv = await TgItem.objects.aget(name=update.message.text)
context.chat_data["inv"] = inv.id
context.chat_data["step"] = "name"
current_step = context.chat_data["step"]
if not current_step and update.message.text == "/inv":
inv = await Item.objects.acreate(user_id=user.id)
inv = await TgItem.objects.acreate(user_id=user.id)
await update.message.reply_markdown_v2(
(
f"Специалист {user.name or user.full_name}, ID {user.id}\n"
@ -94,7 +126,7 @@ class TgBotApp:
context.chat_data["step"] = "name"
elif current_step == "name":
inv = await Item.objects.aget(id=context.chat_data["inv"])
inv = await TgItem.objects.aget(id=context.chat_data["inv"])
inv.name = update.message.text
await inv.asave()
@ -121,7 +153,7 @@ class TgBotApp:
tmc_name = update.message.text
tmc = await CustomTable.objects.aget(name=tmc_name)
inv = await Item.objects.aget(id=context.chat_data["inv"])
inv = await TgItem.objects.aget(id=context.chat_data["inv"])
await inv.tmc.aadd(tmc)
fields = []
@ -149,7 +181,7 @@ class TgBotApp:
element_name = update.message.text
element = await Element.objects.acreate(name=element_name, tmc=tmc)
inv = await Item.objects.aget(id=context.chat_data["inv"])
inv = await TgItem.objects.aget(id=context.chat_data["inv"])
await inv.element.aadd(element)
await update.message.reply_markdown_v2(
@ -166,7 +198,7 @@ class TgBotApp:
context.chat_data["element"] = element.id
context.chat_data["step"] = "add_field_data"
elif current_step == "add_field_data":
inv = await Item.objects.aget(id=context.chat_data["inv"])
inv = await TgItem.objects.aget(id=context.chat_data["inv"])
tmc = await CustomTable.objects.aget(id=inv.tmc)
element = await Element.objects.aget(id=inv.element)
@ -183,7 +215,7 @@ class TgBotApp:
f"Название объекта `{inv.name}`\n"
f"Название ТМЦ `{tmc.name}`\n"
f"Название элемента `{element.name}`\n"
f"Загрузите фото или пришлите текст"
f"Данные загружены"
),
reply_parameters=ReplyParameters(message_id=update.message.message_id),
reply_markup=ReplyKeyboardRemove(),
@ -198,43 +230,31 @@ class TgBotApp:
context.chat_data["step"] = None
context.chat_data["inv"] = None
async def error(self, update: Update, context: CallbackContext):
async def error_handler(
self, update: object, context: ContextTypes.DEFAULT_TYPE
) -> None:
"""Log the error and send a telegram message to notify the developer."""
# Log the error before we do anything else, so we can see it even if something breaks.
logger.error("Exception while handling an update:", exc_info=context.error)
# traceback.format_exception returns the usual python message about an exception, but as a
# list of strings rather than a single string, so we have to join them together.
tb_list = traceback.format_exception(
None, context.error, context.error.__traceback__
)
tb_string = "".join(tb_list)
# Build the message with some markup and additional information about what happened.
# You might need to add some logic to deal with messages longer than the 4096 character limit.
update_str = update.to_dict() if isinstance(update, Update) else str(update)
message = (
"An exception was raised while handling an update\n"
f"<pre>update = {html.escape(json.dumps(update_str, indent=2, ensure_ascii=False))}"
"</pre>\n\n"
f"<pre>context.chat_data = {html.escape(str(context.chat_data))}</pre>\n\n"
f"<pre>context.user_data = {html.escape(str(context.user_data))}</pre>\n\n"
f"<pre>{html.escape(tb_string)}</pre>"
)
logger.error(context.error)
logger.info(f"error in tgbot {context.error}\nReply update")
TgBotClass.my_queue.put(update)
async def set_webhook(self, url):
if not self._app:
logger.error("no app")
return
app = self._app
await app.bot.set_webhook(
url, allowed_updates=Update.ALL_TYPES, drop_pending_updates=True
)
async def init_tg(self):
self._app = (
Application.builder()
.token(settings.TGBOT["token"])
.concurrent_updates(True)
.updater(None)
.build()
)
self._app.add_handler(
CommandHandler("start", self.start, filters.ChatType.PRIVATE)
)
self._app.add_handler(CommandHandler("my", self.my, filters.ChatType.PRIVATE))
self._app.add_handler(CommandHandler("inv", self.inv, filters.ChatType.PRIVATE))
self._app.add_handler(MessageHandler(filters.ChatType.PRIVATE, self.inv))
self._app.add_error_handler(self.error)
logger.info(
{
"app": self._app,
"bot": self._app.bot,
"handlers": self._app.handlers,
}
)
await self._app.initialize()
await self._app.start()
self._app
return self._app
TgBotUpdater.my_queue.put(update)

View File

@ -2,31 +2,27 @@ import json
from telegram import Update
from asgiref.sync import async_to_sync
from rest_framework import viewsets
from rest_framework import mixins, viewsets
from rest_framework.response import Response
from .apps import TgBotClass
from .models import Item
from .serializers import ItemSerializer
from .tgbot import TgBot
from .apps import TgBotUpdater
from .models import TgItem
from .serializers import TgItemSerializer
import logging
logger = logging.getLogger("root")
class ItemViewSet(viewsets.ViewSet):
queryset = Item.objects.all()
serializer_class = ItemSerializer
@async_to_sync
async def create(self, request):
req = json.loads(request.body)
update_item = Update.de_json(data=req, bot=TgBotClass.app.bot)
TgBotClass.my_queue.put(update_item)
logger.info(
f"Update from {update_item.message.chat.id} pass to que and its size is {TgBotClass.my_queue.qsize()}"
class TgItemViewSet(viewsets.ModelViewSet):
queryset = TgItem.objects.all()
serializer_class = TgItemSerializer
def create(self, request, *args, **kwargs):
TgBotUpdater.my_queue.put(
Update.de_json(data=json.loads(request.body), bot=TgBot.app.bot)
)
return Response({"result": "ok"})
# return super().create(request, *args, **kwargs)
return Response({"result": "pass data to tgbot"})