to_inventory/back/tgbot/apps.py

60 lines
1.6 KiB
Python

from asgiref.sync import async_to_sync
from django.apps import AppConfig
import asyncio
import threading
import queue
import time
class TgBotClass(AppConfig):
default_auto_field = "django.db.models.BigAutoField"
name = "tgbot"
is_run = False
app = None
update_queue = None
my_queue = queue.Queue()
# @async_to_sync
async def init_bot(self):
from django.conf import settings
from .tgbot import TgBotApp
tgbot = TgBotApp()
app = await tgbot.init_tg()
await tgbot.set_webhook(
f"https://{settings.TGBOT['base_url']}/api/tgbot/",
)
return app
async def some_function(self=None):
while True:
if not TgBotClass.my_queue.empty():
item = TgBotClass.my_queue.get()
await TgBotClass.app.process_update(item)
TgBotClass.my_queue.task_done()
time.sleep(1)
async def some_callback():
await TgBotClass.some_function()
def between_callback():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(TgBotClass.some_callback())
loop.close()
def ready(self):
import os
if os.environ.get("RUN_MAIN", None) != "true":
return
if TgBotClass.is_run:
return
TgBotClass.is_run = True
TgBotClass.app = async_to_sync(self.init_bot, force_new_loop=True)()
thread = threading.Thread(target=TgBotClass.between_callback)
thread.setDaemon(True)
thread.start()