import requests import json import re from datetime import datetime import locale from urllib.parse import parse_qs from pathlib import Path from fastapi import FastAPI, Request from fastapi.staticfiles import StaticFiles from fastapi.templating import Jinja2Templates from fastapi.responses import HTMLResponse from fastapi.middleware.cors import CORSMiddleware from app.constants import * app = FastAPI() templates = Jinja2Templates(directory="templates") origins = [ "http://localhost", "http://localhost:3000", "http://localhost:3001", # "http://192.168.100.242" ] app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) try: locale.setlocale(locale.LC_TIME, "ru_RU") except Exception as e: logger.error(e) # Создаем кастомный фильтр для форматирования дат def format_datetime(value, format="%d %B %Y"): """Форматирует дату в человекочитаемый вид.""" try: date = datetime.fromisoformat(value) return date.strftime(format) except Exception as e: return value statuses = { "1": "Новая", "2": "Ждет выполнения", "3": "Выполняется", "4": "Требуется контроль", "5": "Завершена", "6": "Отложена", "7": "Отклонена", } # Создаем кастомный фильтр для форматирования дат def format_status(value): value = str(value) res = statuses[value] if value in statuses else value return res # Регистрируем фильтр в Jinja2Templates templates.env.filters["format_status"] = format_status templates.env.filters["format_datetime"] = format_datetime @app.get("/") def read_root(): return {"service": "BX integrations"} @app.post("/integration") async def intgr(request: Request): body = await request.json() ref = body["ref"] branch = ref.split("/")[-1] [tag, number, *args] = branch.split("-") logger.info(f"input tag {tag} number {number} args {args}") bx_comment = ( f"Сообщение от GITEA для задачи {number} (ветка {body['ref']} в репозитории {body['repository']['full_name']})" "\n\n" f"{'Создана ветка' if not 'head_commit' in body else body['head_commit']['message']}" ) bx_res = requests.get( f"{WEBHOOK}/task.commentitem.add.json?" f"taskId={number}" f"&fields[POST_MESSAGE]={bx_comment}" ) logger.info(f"result {json.loads(bx_res.text)}") return {"status": "success"} @app.post("/integration_tg/{bx_id}/{tg_id}") async def tg_intgr_get(request: Request): try: body = await request.body() query = parse_qs(body) bx_id = request.path_params["bx_id"] tg_id = request.path_params["tg_id"] task_id = query[b"data[FIELDS_AFTER][TASK_ID]"][0].decode() item_id = query[b"data[FIELDS_AFTER][ID]"][0].decode() get_comment_hook = ( f"{WEBHOOK}/task.commentitem.get.json?taskId={task_id}&ITEMID={item_id}" ) comment_data = requests.get(get_comment_hook) comment_data_json = comment_data.json() mentions = re.findall( rf"\[USER=({bx_id})\]", comment_data_json["result"]["POST_MESSAGE"] ) if len(mentions) > 0: logger.info(f"sending tg mention to {bx_id}") get_task_hook = ( f"{WEBHOOK}/tasks.task.get.json?taskId={task_id}&select[0]=TITLE" ) task_data = requests.get(get_task_hook) task_data_json = task_data.json() comment_data = comment_data_json["result"]["POST_MESSAGE"] comment_data = re.sub('"', "", comment_data) comment_data = re.sub( r"((\[)(.*?)(]))|\"", "", comment_data, ) comment_body = " ".join(comment_data.split(" ")[:100]) comment = ( f"Упоминание от {comment_data_json['result']['AUTHOR_NAME']}\n" f"Задача {task_data_json['result']['task']['id']} [**{task_data_json['result']['task']['title']}**]" f"(https://crm.svs-tech.pro/company/personal/user/{bx_id}/tasks/task/view/{task_id}/)\n\n" ) + comment_body requests.get( f"https://api.telegram.org/bot{TOKEN}/sendMessage?chat_id={tg_id}&text={comment}&parse_mode=MarkdownV2" ) return {"status": "success"} except Exception as e: logger.info(e) return {"status": "error"} def extract_id(params): json_strings = params.get("PLACEMENT_OPTIONS", []) for json_str in json_strings: try: data = json.loads(json_str) if "ID" in data: return data["ID"] except json.JSONDecodeError: print(f"Ошибка: Некорректная JSON-строка - {json_str}") def get_task_by_deal(deal_id): select = [ "ID", "TITLE", "RESPONSIBLE_ID", "CREATED_DATE", "DEADLINE", "STATUS", "GROUP_ID", ] get_task_hook = f"{WEBHOOK}/tasks.task.list?{'&'.join([f'select[]={s}' for s in select])}&filter[UF_CRM_TASK]=D_{deal_id}" task_data = requests.get(get_task_hook) task_data_json = task_data.json() result = task_data_json["result"]["tasks"] while len(task_data_json["result"]["tasks"]) == 50 and len(result) < 50 * 4: limit = (len(result) // 50) * 50 task_data = requests.get(get_task_hook + f"&start={limit}") task_data_json = task_data.json() result += task_data_json["result"]["tasks"] return result @app.post("/deal_tab") async def deal_tab( request: Request, DOMAIN: str | None = None, PROTOCOL: str | None = None, LANG: str | None = None, APP_SID: str | None = None, ): try: body = await request.body() b_str = body.decode() result = parse_qs(b_str) q = [DOMAIN, PROTOCOL, LANG, APP_SID] deal_id = extract_id(result) if not deal_id: raise ("not deal id") deal_id = 49 result = get_task_by_deal(deal_id) # logger.info(task_data_json["result"]["tasks"]) parts = WEBHOOK.split("/") domain = f"https://{parts[2]}" return templates.TemplateResponse( request=request, name="deal_tab.html", context={"tasks": result, "domain": domain}, ) return {"status": "success", "result": task_data_json} except Exception as e: logger.error(e) return {"status": "error"} @app.post("/widget/deal_tab/{deal_id}") async def deal_tab(request: Request, deal_id): try: result = get_task_by_deal(deal_id) # logger.info(task_data_json["result"]["tasks"]) parts = WEBHOOK.split("/") domain = f"https://{parts[2]}" return { "status": "success", "result": result, "format": {"domain": domain, "statuses": statuses}, } except Exception as e: logger.error(e) return {"status": "error"} app.mount("/widget", StaticFiles(directory="front/dist", html=True), name="static") NUXT_DIST = Path("front/dist") @app.post("/widget") async def handle_widget_post(request: Request): # Обработка данных POST-запроса # data = await request.json() # Загружаем сгенерированный HTML (например, index.html) body = await request.body() b_str = body.decode() result = parse_qs(b_str) deal_id = extract_id(result) if not deal_id: raise ("not deal id") deal_id = 49 logger.info(result) html_file = NUXT_DIST / "deal/index.html" if html_file.exists(): html_content = html_file.read_text() html_content = html_content.replace("data_deal_id:0", f"data_deal_id:{deal_id}") return HTMLResponse(content=html_content, status_code=200) return {"error": "Nuxt.js page not found"}, 404