import json import os import pprint import requests from dotenv import load_dotenv from fastapi import FastAPI, HTTPException from fastapi.staticfiles import StaticFiles from fastapi.middleware.cors import CORSMiddleware import logging from helpers import flatten_dict load_dotenv() logging.basicConfig( level=logging.INFO, format="%(asctime)s %(levelname)s %(filename)s:%(lineno)d %(funcName)s %(message)s", ) logger = logging.getLogger(__name__) origins = [ "http://localhost", "http://localhost:3010", "http://localhost:4173", "http://localhost:5173", "http://localhost:8000", ] vtk = json.loads(os.getenv("VTK_KEYS")) vtk_org = {} BX_API_CALL = os.getenv("BX_API_CALL") TG_TOKEN = os.getenv("TG_TOKEN") ODATA_AUTH = os.getenv("ODATA_AUTH") api_app = FastAPI(title="api app") @api_app.get("/") def test(): return {"app": "service monitoring api"} @api_app.get("/vtk/{item_id}") def get_vtk(item_id): if not item_id in vtk: raise HTTPException(status_code=404, detail="Item not found") try: if not item_id in vtk_org: res = requests.get( "https://my.vendotek.com/api/v1/org", headers={"Authorization": f"Bearer {vtk[item_id]}"}, ) data = res.json() org_id = data[0]["name"] vtk_org[item_id] = org_id res_unit = requests.get( f"https://my.vendotek.com/api/v1/org/{vtk_org[item_id]}/unit", headers={"Authorization": f"Bearer {vtk[item_id]}"}, ) data_unit = res_unit.json() result = [ { "sn": d["sn"], "unit_id": d["unit_id"], "last_online": d["last_seen_at"], "name": ", ".join( list(filter(None, [d["location_name"], d["address"], d["city"]])) ), "ip": list( filter( lambda item: item["name"] == "IP address", list( filter( lambda item: item["id"] == "LAN", list( filter( lambda item: item["id"] == "Communications", d["modules"], ) )[0]["modules"], ) )[0]["details"], ) )[0]["value"], } for d in data_unit ] return {"status": "success", "data": result} except Exception as e: logger.error(e) raise HTTPException(status_code=500, detail=str(e)) @api_app.get("/tg/get_me") async def get_tg_data(): try: url = f"https://api.telegram.org/bot{TG_TOKEN}/getMe" res = requests.get(url) logger.info(url) data = res.json() if data["ok"] == False: raise HTTPException(status_code=500, detail=data["description"]) return {"status": "success", "data": data["result"]} except Exception as e: logger.error(e) raise HTTPException(status_code=500, detail=str(e)) @api_app.get("/tg/get_updates") async def get_tg_data(): try: url = f"https://api.telegram.org/bot{TG_TOKEN}/getUpdates?offset=-1" res = requests.get(url) logger.info(url) data = res.json() if data["ok"] == False: raise HTTPException(status_code=500, detail=data["description"]) return {"status": "success", "data": data["result"]} except Exception as e: logger.error(e) raise HTTPException(status_code=500, detail=str(e)) @api_app.get("/bx/get_users") async def get_bx_users(): try: url = f"{BX_API_CALL}user.get.json" res = requests.get(url) logger.info(url) data = res.json() return { "status": "success", "data": [ v for v in data["result"] if v["UF_DEPARTMENT"][0] not in [15, 16] ], } except Exception as e: logger.error(e) raise HTTPException(status_code=500, detail=str(e)) @api_app.get("/nomen") async def get_parent_nomen(): try: parent_ids = [ "a6aadfe8-7e0f-11ee-ab5a-a47a2bd811cb", "8a988ff-0911-11ee-ab4e-e3e667c628bd", "a0fb742f-09f5-11ee-ab4e-e3e667c628bd", ] url = "https://1c.svs-tech.pro/UNF/odata/standard.odata/Catalog_КатегорииНоменклатуры?$format=json" url += "&" url + " or ".join([f"Ref_Key eq guid'{p}'" for p in parent_ids]) url += "&$select=Description,Ref_Key" res = requests.get(url, headers={"Authorization": ODATA_AUTH}) data = res.json() return { "status": "success", "data": [ {"label": d["Description"], "id": d["Ref_Key"]} for d in data["value"] ], } except Exception as e: logger.error(e) raise HTTPException(status_code=500, detail=str(e)) @api_app.get("/nomen/{nomen_id}") async def get_nomen_items(nomen_id): try: url = "https://1c.svs-tech.pro/UNF/odata/standard.odata/Catalog_Номенклатура?$format=json" url += "&$select=Ref_Key,Description" url += f"&$filter=КатегорияНоменклатуры_Key eq guid'{nomen_id}'" res = requests.get(url, headers={"Authorization": ODATA_AUTH}) data = res.json() return { "status": "success", "data": [ {"label": d["Description"], "id": d["Ref_Key"]} for d in data["value"] ], } except Exception as e: logger.error(e) raise HTTPException(status_code=500, detail=str(e)) @api_app.get("/nomen_item/{item_id}") async def get_nomen_items(item_id): try: url = f"https://1c.svs-tech.pro/UNF/odata/standard.odata/Catalog_Номенклатура(guid'{item_id}')?$format=json" res = requests.get(url, headers={"Authorization": ODATA_AUTH}) data = res.json() return { "status": "success", "data": data, } except Exception as e: logger.error(e) raise HTTPException(status_code=500, detail=str(e)) app = FastAPI(title="main app") app.exception_handler(404) async def return_error(): return {"error": "custom error"} app.add_middleware( CORSMiddleware, allow_origins=origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) app.mount("/api", api_app) app.mount("/", StaticFiles(directory="front/.output/public", html=True), name="front")