to_inventory/back/tgbot/apps.py

90 lines
2.4 KiB
Python

from django.apps import AppConfig
import asyncio
import threading
import queue
import os
class TgBotUpdater:
is_run = False
app = None
tgbot_class = None
update_queue = None
my_queue = queue.Queue()
import logging
logger = logging.getLogger(__name__)
def __init__(self) -> None:
self.loop = asyncio.get_event_loop()
try:
from .tgbot import TgBot
self.tgbot_class = TgBot()
except Exception as e:
self.logger.error(e)
TgBotUpdater.is_run = True
async def _set_webhook(self):
await self.tgbot_class.set_webhook()
async def _start_app(self):
await self.tgbot_class.start_app()
async def _set_hadlers(self):
await self.tgbot_class.set_handlers()
async def _run_func(self):
from .tgbot import TgBot
while hasattr(TgBot, "app"):
# self.logger.info(f"check updates in {await TgBot.app.bot.get_webhook_info()}")
if not TgBotUpdater.my_queue.empty():
item = TgBotUpdater.my_queue.get()
if (
isinstance(item, dict)
and "name" in item
and item["name"].startswith("admin_")
):
await self.tgbot_class.admin_action(item["name"], item["queryset"])
else:
try:
await TgBot.app.process_update(item)
except Exception as e:
print(f"Error in tg thread {e}")
await TgBot.app.process_update(item)
TgBotUpdater.my_queue.task_done()
await asyncio.sleep(3)
async def main(self):
await asyncio.gather(
self._set_webhook(),
self._set_hadlers(),
self._start_app(),
self._run_func(),
)
def run_func(self):
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self.main())
self.loop.close()
class TgbotConfig(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "tgbot"
def ready(self):
if not TgBotUpdater.is_run and os.environ.get("RUN_MAIN", None) == "true":
threading.Thread(
target=(TgBotUpdater().run_func),
name="tg_updater_thread",
daemon=True,
).start()
return super().ready()