import requests import json import re from urllib.parse import parse_qs from fastapi import FastAPI, Request from app.constants import * app = FastAPI() @app.get("/") def read_root(): return {"service": "BX integrations"} @app.post("/integration") async def intgr(request: Request): body = await request.json() ref = body["ref"] branch = ref.split("/")[-1] [tag, number, *args] = branch.split("-") logger.info(f"input tag {tag} number {number} args {args}") bx_comment = ( f"Сообщение от GITEA для задачи {number} (ветка {body['ref']} в репозитории {body['repository']['full_name']})" "\n\n" f"{'Создана ветка' if not 'head_commit' in body else body['head_commit']['message']}" ) bx_res = requests.get( f"{WEBHOOK}/task.commentitem.add.json?" f"taskId={number}" f"&fields[POST_MESSAGE]={bx_comment}" ) logger.info(f"result {json.loads(bx_res.text)}") return {"status": "success"} @app.post("/integration_tg/{bx_id}/{tg_id}") async def tg_intgr_get(request: Request): try: body = await request.body() query = parse_qs(body) bx_id = request.path_params["bx_id"] tg_id = request.path_params["tg_id"] task_id = query[b"data[FIELDS_AFTER][TASK_ID]"][0].decode() item_id = query[b"data[FIELDS_AFTER][ID]"][0].decode() get_comment_hook = ( f"{WEBHOOK}/task.commentitem.get.json?taskId={task_id}&ITEMID={item_id}" ) comment_data = requests.get(get_comment_hook) comment_data_json = comment_data.json() mentions = re.findall( rf"\[USER=({bx_id})\]", comment_data_json["result"]["POST_MESSAGE"] ) if len(mentions) > 0: logger.info(f"sending tg mention to {bx_id}") get_task_hook = ( f"{WEBHOOK}/tasks.task.get.json?taskId={task_id}&select[0]=TITLE" ) task_data = requests.get(get_task_hook) task_data_json = task_data.json() comment_data = comment_data_json["result"]["POST_MESSAGE"] comment_data = re.sub('"', "", comment_data) comment_data = re.sub( r"((\[)(.*?)(]))|\"", "", comment_data, ) comment = ( f"Упоминание в задаче от {comment_data_json['result']['AUTHOR_NAME']}\n" f"Задача: {task_data_json['result']['task']['id']} {task_data_json['result']['task']['title']}" f"https://crm.svs-tech.pro/company/personal/user/{bx_id}/tasks/task/view/{task_id}/ \n" " ".join(comment_data.split(" ")[:100]) ) requests.get( f"https://api.telegram.org/bot{TOKEN}/sendMessage?chat_id={tg_id}&text={comment}" ) return {"status": "success"} except Exception as e: logger.info(e) return {"status": "error"}