bx-gitea-integration/app/main.py

278 lines
8.1 KiB
Python

import requests
import json
import re
from datetime import datetime
import locale
from urllib.parse import parse_qs
from pathlib import Path
from fastapi import FastAPI, Request
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from fastapi.responses import HTMLResponse
from fastapi.middleware.cors import CORSMiddleware
from app.constants import *
app = FastAPI()
templates = Jinja2Templates(directory="templates")
origins = [
"http://localhost",
"http://localhost:3000",
"http://localhost:3001",
# "http://192.168.100.242"
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
try:
locale.setlocale(locale.LC_TIME, "ru_RU")
except Exception as e:
logger.error(e)
# Создаем кастомный фильтр для форматирования дат
def format_datetime(value, format="%d %B %Y"):
"""Форматирует дату в человекочитаемый вид."""
try:
date = datetime.fromisoformat(value)
return date.strftime(format)
except Exception as e:
return value
statuses = {
"1": "Новая",
"2": "Ждет выполнения",
"3": "Выполняется",
"4": "Требуется контроль",
"5": "Завершена",
"6": "Отложена",
"7": "Отклонена",
}
# Создаем кастомный фильтр для форматирования дат
def format_status(value):
value = str(value)
res = statuses[value] if value in statuses else value
return res
# Регистрируем фильтр в Jinja2Templates
templates.env.filters["format_status"] = format_status
templates.env.filters["format_datetime"] = format_datetime
@app.get("/")
def read_root():
return {"service": "BX integrations"}
@app.post("/integration")
async def intgr(request: Request):
body = await request.json()
ref = body["ref"]
branch = ref.split("/")[-1]
[tag, number, *args] = branch.split("-")
logger.info(f"input tag {tag} number {number} args {args}")
bx_comment = (
f"Сообщение от GITEA для задачи {number} (ветка {body['ref']} в репозитории {body['repository']['full_name']})"
"\n\n"
f"{'Создана ветка' if not 'head_commit' in body else body['head_commit']['message']}"
)
bx_res = requests.get(
f"{WEBHOOK}/task.commentitem.add.json?"
f"taskId={number}"
f"&fields[POST_MESSAGE]={bx_comment}"
)
logger.info(f"result {json.loads(bx_res.text)}")
return {"status": "success"}
@app.post("/integration_tg/{bx_id}/{tg_id}")
async def tg_intgr_get(request: Request):
try:
body = await request.body()
query = parse_qs(body)
bx_id = request.path_params["bx_id"]
tg_id = request.path_params["tg_id"]
task_id = query[b"data[FIELDS_AFTER][TASK_ID]"][0].decode()
item_id = query[b"data[FIELDS_AFTER][ID]"][0].decode()
get_comment_hook = (
f"{WEBHOOK}/task.commentitem.get.json?taskId={task_id}&ITEMID={item_id}"
)
comment_data = requests.get(get_comment_hook)
comment_data_json = comment_data.json()
mentions = re.findall(
rf"\[USER=({bx_id})\]", comment_data_json["result"]["POST_MESSAGE"]
)
if len(mentions) > 0:
logger.info(f"sending tg mention to {bx_id}")
get_task_hook = (
f"{WEBHOOK}/tasks.task.get.json?taskId={task_id}&select[0]=TITLE"
)
task_data = requests.get(get_task_hook)
task_data_json = task_data.json()
comment_data = comment_data_json["result"]["POST_MESSAGE"]
comment_data = re.sub('"', "", comment_data)
comment_data = re.sub(
r"((\[)(.*?)(]))|\"",
"",
comment_data,
)
comment_body = " ".join(comment_data.split(" ")[:100])
comment = (
f"Упоминание от {comment_data_json['result']['AUTHOR_NAME']}\n"
f"Задача {task_data_json['result']['task']['id']} [**{task_data_json['result']['task']['title']}**]"
f"(https://crm.svs-tech.pro/company/personal/user/{bx_id}/tasks/task/view/{task_id}/)\n\n"
) + comment_body
requests.get(
f"https://api.telegram.org/bot{TOKEN}/sendMessage?chat_id={tg_id}&text={comment}&parse_mode=MarkdownV2"
)
return {"status": "success"}
except Exception as e:
logger.info(e)
return {"status": "error"}
def extract_id(params):
json_strings = params.get("PLACEMENT_OPTIONS", [])
for json_str in json_strings:
try:
data = json.loads(json_str)
if "ID" in data:
return data["ID"]
except json.JSONDecodeError:
print(f"Ошибка: Некорректная JSON-строка - {json_str}")
def get_task_by_deal(deal_id):
select = [
"ID",
"TITLE",
"RESPONSIBLE_ID",
"CREATED_DATE",
"DEADLINE",
"STATUS",
"GROUP_ID",
]
get_task_hook = f"{WEBHOOK}/tasks.task.list?{'&'.join([f'select[]={s}' for s in select])}&filter[UF_CRM_TASK]=D_{deal_id}"
task_data = requests.get(get_task_hook)
task_data_json = task_data.json()
result = task_data_json["result"]["tasks"]
while len(task_data_json["result"]["tasks"]) == 50 and len(result) < 50 * 4:
limit = (len(result) // 50) * 50
task_data = requests.get(get_task_hook + f"&start={limit}")
task_data_json = task_data.json()
result += task_data_json["result"]["tasks"]
return result
@app.post("/deal_tab")
async def deal_tab(
request: Request,
DOMAIN: str | None = None,
PROTOCOL: str | None = None,
LANG: str | None = None,
APP_SID: str | None = None,
):
try:
body = await request.body()
b_str = body.decode()
result = parse_qs(b_str)
q = [DOMAIN, PROTOCOL, LANG, APP_SID]
deal_id = extract_id(result)
if not deal_id:
raise ("not deal id")
deal_id = 49
result = get_task_by_deal(deal_id)
# logger.info(task_data_json["result"]["tasks"])
parts = WEBHOOK.split("/")
domain = f"https://{parts[2]}"
return templates.TemplateResponse(
request=request,
name="deal_tab.html",
context={"tasks": result, "domain": domain},
)
return {"status": "success", "result": task_data_json}
except Exception as e:
logger.error(e)
return {"status": "error"}
@app.post("/widget/deal_tab/{deal_id}")
async def deal_tab(request: Request, deal_id):
try:
result = get_task_by_deal(deal_id)
# logger.info(task_data_json["result"]["tasks"])
parts = WEBHOOK.split("/")
domain = f"https://{parts[2]}"
return {
"status": "success",
"result": result,
"format": {"domain": domain, "statuses": statuses},
}
except Exception as e:
logger.error(e)
return {"status": "error"}
app.mount("/widget", StaticFiles(directory="front/dist", html=True), name="static")
NUXT_DIST = Path("front/dist")
@app.post("/widget")
async def handle_widget_post(request: Request):
# Обработка данных POST-запроса
# data = await request.json()
# Загружаем сгенерированный HTML (например, index.html)
body = await request.body()
b_str = body.decode()
result = parse_qs(b_str)
deal_id = extract_id(result)
if not deal_id:
raise ("not deal id")
deal_id = 49
logger.info(result)
html_file = NUXT_DIST / "deal/index.html"
if html_file.exists():
html_content = html_file.read_text()
html_content = html_content.replace("data_deal_id:0", f"data_deal_id:{deal_id}")
return HTMLResponse(content=html_content, status_code=200)
return {"error": "Nuxt.js page not found"}, 404