bx-gitea-integration/app/main.py

73 lines
2.1 KiB
Python

import requests
import json
import re
from urllib.parse import parse_qs
from fastapi import FastAPI, Request
from app.constants import *
app = FastAPI()
tg = {38: 198225670}
@app.get("/")
def read_root():
return {"service": "BX integrations"}
@app.post("/integration")
async def intgr(request: Request):
body = await request.json()
ref = body["ref"]
branch = ref.split("/")[-1]
[tag, number, *args] = branch.split("-")
logger.info(f"input tag {tag} number {number} args {args}")
bx_comment = (
f"Сообщение от GITEA для задачи {number} (ветка {body['ref']} в репозитории {body['repository']['full_name']})"
"\n\n"
f"{'Создана ветка' if not 'head_commit' in body else body['head_commit']['message']}"
)
bx_res = requests.get(
f"{WEBHOOK}/task.commentitem.add.json?"
f"taskId={number}"
f"&fields[POST_MESSAGE]={bx_comment}"
)
logger.info(f"result {json.loads(bx_res.text)}")
return {"status": "success"}
@app.post("/integration_tg/{bx_id}/{tg_id}")
async def tg_intgr_get(request: Request):
try:
body = await request.body()
query = parse_qs(body)
bx_id = request.path_params["bx_id"]
tg_id = request.path_params["tg_id"]
task_id = query[b"data[FIELDS_AFTER][TASK_ID]"][0].decode()
item_id = query[b"data[FIELDS_AFTER][ID]"][0].decode()
get_comment_hook = (
f"{WEBHOOK}/task.commentitem.get.json?taskId={task_id}&ITEMID={item_id}"
)
data = requests.get(get_comment_hook)
data_json = data.json()
comment = re.sub(r"\\[USER=\d+\]|\[\/USER\]", "", data_json["result"]["POST_MESSAGE"])
comment += f'\nhttps://crm.svs-tech.pro/company/personal/user/{bx_id}/tasks/task/view/{task_id}/'
mentions = re.findall(rf"\[USER=({bx_id})\]", comment)
for _ in mentions:
requests.get(
f"https://api.telegram.org/bot{TOKEN}/sendMessage?chat_id={tg_id}&text={comment}"
)
return {"status": "success"}
except:
logger.info(request.__dict__)
return {"status": "error"}