192 lines
5.9 KiB
Python
192 lines
5.9 KiB
Python
import requests
|
|
import json
|
|
import re
|
|
from datetime import datetime
|
|
import locale
|
|
|
|
from urllib.parse import parse_qs
|
|
|
|
from fastapi import FastAPI, Request
|
|
from fastapi.templating import Jinja2Templates
|
|
|
|
from app.constants import *
|
|
|
|
app = FastAPI()
|
|
templates = Jinja2Templates(directory="templates")
|
|
try:
|
|
locale.setlocale(locale.LC_TIME, "ru_RU")
|
|
except Exception as e:
|
|
logger.error(e)
|
|
|
|
|
|
# Создаем кастомный фильтр для форматирования дат
|
|
def format_datetime(value, format="%d %B %Y, %H:%M:%S"):
|
|
"""Форматирует дату в человекочитаемый вид."""
|
|
try:
|
|
date = datetime.fromisoformat(value)
|
|
return date.strftime(format)
|
|
except Exception as e:
|
|
return date
|
|
|
|
|
|
statuses = {
|
|
"1": "Новое",
|
|
"2": "Ожидание",
|
|
"3": "Выполняется",
|
|
"4": "Проверка",
|
|
"5": "Завершено",
|
|
"6": "Отложено",
|
|
"7": "Отклонено",
|
|
}
|
|
|
|
# Создаем кастомный фильтр для форматирования дат
|
|
def format_status(value):
|
|
value = str(value)
|
|
res = statuses[value] if value in statuses else value
|
|
return res
|
|
|
|
# Регистрируем фильтр в Jinja2Templates
|
|
templates.env.filters["format_status"] = format_status
|
|
templates.env.filters["format_datetime"] = format_datetime
|
|
|
|
|
|
@app.get("/")
|
|
def read_root():
|
|
return {"service": "BX integrations"}
|
|
|
|
|
|
@app.post("/integration")
|
|
async def intgr(request: Request):
|
|
body = await request.json()
|
|
ref = body["ref"]
|
|
branch = ref.split("/")[-1]
|
|
|
|
[tag, number, *args] = branch.split("-")
|
|
logger.info(f"input tag {tag} number {number} args {args}")
|
|
|
|
bx_comment = (
|
|
f"Сообщение от GITEA для задачи {number} (ветка {body['ref']} в репозитории {body['repository']['full_name']})"
|
|
"\n\n"
|
|
f"{'Создана ветка' if not 'head_commit' in body else body['head_commit']['message']}"
|
|
)
|
|
|
|
bx_res = requests.get(
|
|
f"{WEBHOOK}/task.commentitem.add.json?"
|
|
f"taskId={number}"
|
|
f"&fields[POST_MESSAGE]={bx_comment}"
|
|
)
|
|
logger.info(f"result {json.loads(bx_res.text)}")
|
|
|
|
return {"status": "success"}
|
|
|
|
|
|
@app.post("/integration_tg/{bx_id}/{tg_id}")
|
|
async def tg_intgr_get(request: Request):
|
|
try:
|
|
body = await request.body()
|
|
query = parse_qs(body)
|
|
|
|
bx_id = request.path_params["bx_id"]
|
|
tg_id = request.path_params["tg_id"]
|
|
|
|
task_id = query[b"data[FIELDS_AFTER][TASK_ID]"][0].decode()
|
|
item_id = query[b"data[FIELDS_AFTER][ID]"][0].decode()
|
|
|
|
get_comment_hook = (
|
|
f"{WEBHOOK}/task.commentitem.get.json?taskId={task_id}&ITEMID={item_id}"
|
|
)
|
|
comment_data = requests.get(get_comment_hook)
|
|
comment_data_json = comment_data.json()
|
|
|
|
mentions = re.findall(
|
|
rf"\[USER=({bx_id})\]", comment_data_json["result"]["POST_MESSAGE"]
|
|
)
|
|
if len(mentions) > 0:
|
|
logger.info(f"sending tg mention to {bx_id}")
|
|
|
|
get_task_hook = (
|
|
f"{WEBHOOK}/tasks.task.get.json?taskId={task_id}&select[0]=TITLE"
|
|
)
|
|
task_data = requests.get(get_task_hook)
|
|
task_data_json = task_data.json()
|
|
|
|
comment_data = comment_data_json["result"]["POST_MESSAGE"]
|
|
comment_data = re.sub('"', "", comment_data)
|
|
comment_data = re.sub(
|
|
r"((\[)(.*?)(]))|\"",
|
|
"",
|
|
comment_data,
|
|
)
|
|
comment_body = " ".join(comment_data.split(" ")[:100])
|
|
|
|
comment = (
|
|
f"Упоминание от {comment_data_json['result']['AUTHOR_NAME']}\n"
|
|
f"Задача {task_data_json['result']['task']['id']} [**{task_data_json['result']['task']['title']}**]"
|
|
f"(https://crm.svs-tech.pro/company/personal/user/{bx_id}/tasks/task/view/{task_id}/)\n\n"
|
|
) + comment_body
|
|
|
|
requests.get(
|
|
f"https://api.telegram.org/bot{TOKEN}/sendMessage?chat_id={tg_id}&text={comment}&parse_mode=MarkdownV2"
|
|
)
|
|
|
|
return {"status": "success"}
|
|
except Exception as e:
|
|
logger.info(e)
|
|
return {"status": "error"}
|
|
|
|
|
|
@app.post("/deal_tab")
|
|
async def deal_tab(
|
|
request: Request,
|
|
DOMAIN: str | None = None,
|
|
PROTOCOL: str | None = None,
|
|
LANG: str | None = None,
|
|
APP_SID: str | None = None,
|
|
):
|
|
try:
|
|
body = await request.body()
|
|
b_str = body.decode()
|
|
result = parse_qs(b_str)
|
|
q = [DOMAIN, PROTOCOL, LANG, APP_SID]
|
|
|
|
def extract_id(params):
|
|
json_strings = params.get("PLACEMENT_OPTIONS", [])
|
|
for json_str in json_strings:
|
|
try:
|
|
data = json.loads(json_str)
|
|
if "ID" in data:
|
|
return data["ID"]
|
|
except json.JSONDecodeError:
|
|
print(f"Ошибка: Некорректная JSON-строка - {json_str}")
|
|
|
|
deal_id = extract_id(result)
|
|
if not deal_id:
|
|
deal_id = 49
|
|
|
|
select = [
|
|
"ID",
|
|
"TITLE",
|
|
"RESPONSIBLE_ID",
|
|
"CREATED_DATE",
|
|
"DEADLINE",
|
|
"STATUS",
|
|
"GROUP_ID",
|
|
]
|
|
get_task_hook = f"{WEBHOOK}/tasks.task.list?{'&'.join([f'select[]={s}' for s in select])}&filter[UF_CRM_TASK]=D_{deal_id}&limit=100"
|
|
task_data = requests.get(get_task_hook)
|
|
task_data_json = task_data.json()
|
|
|
|
# logger.info(task_data_json["result"]["tasks"])
|
|
parts = WEBHOOK.split("/")
|
|
domain = f"https://{parts[2]}"
|
|
return templates.TemplateResponse(
|
|
request=request,
|
|
name="deal_tab.html",
|
|
context={"tasks": task_data_json["result"]["tasks"], "domain": domain},
|
|
)
|
|
|
|
return {"status": "success", "result": task_data_json}
|
|
except Exception as e:
|
|
logger.error(e)
|
|
return {"status": "error"}
|