108 lines
3.9 KiB
Python
108 lines
3.9 KiB
Python
import urllib
|
||
import requests
|
||
|
||
from django.conf import settings
|
||
|
||
from rest_framework import viewsets
|
||
from rest_framework.decorators import action
|
||
from rest_framework import status
|
||
from rest_framework.response import Response
|
||
|
||
from .models import Element, Partner
|
||
from .serializers import PartnerSerializer, ElementSerializer
|
||
|
||
import logging
|
||
|
||
logger = logging.getLogger("root")
|
||
|
||
|
||
class PartnerViewSet(viewsets.ModelViewSet):
|
||
"""
|
||
API endpoint that allows partners to be viewed or edited.
|
||
"""
|
||
|
||
queryset = Partner.objects.all()
|
||
serializer_class = PartnerSerializer
|
||
|
||
@action(detail=False, methods=["get"], url_path=r"external")
|
||
def get_remote_partners(self, request):
|
||
params = {
|
||
"$format": "json",
|
||
"$select": ",".join(["НаименованиеПолное", "Description", "Ref_Key"]),
|
||
"$filter": "Недействителен eq false",
|
||
}
|
||
remote_url = (
|
||
"https://1c.svs-tech.pro/UNF/odata/standard.odata/Catalog_Контрагенты?"
|
||
+ "&".join([f"{p}={params[p]}" for p in params])
|
||
)
|
||
data = requests.get(remote_url, headers={"Authorization": settings.ODATA_AUTH})
|
||
try:
|
||
parsed_data = data.json()
|
||
return Response(parsed_data["value"])
|
||
except Exception as e:
|
||
logger.error(e)
|
||
return Response(status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
||
|
||
|
||
class ElementViewSet(viewsets.ModelViewSet):
|
||
"""
|
||
API endpoint that allows elements to be viewed or edited.
|
||
"""
|
||
|
||
queryset = Element.objects.all()
|
||
serializer_class = ElementSerializer
|
||
|
||
@action(detail=False, methods=["get"], url_path=r"external_categories")
|
||
def get_remote_categories(self, request):
|
||
try:
|
||
params = {
|
||
"$format": "json",
|
||
"$select": ",".join(["Description", "Ref_Key", "Parent_Key"]),
|
||
"$filter": "Parent_Key eq guid'a6aadfe8-7e0f-11ee-ab5a-a47a2bd811cb'",
|
||
}
|
||
remote_url = (
|
||
"https://1c.svs-tech.pro/UNF/odata/standard.odata/Catalog_КатегорииНоменклатуры?"
|
||
+ "&".join([f"{p}={params[p]}" for p in params])
|
||
)
|
||
data = requests.get(
|
||
remote_url, headers={"Authorization": settings.ODATA_AUTH}
|
||
)
|
||
parsed_data = data.json()
|
||
|
||
params["$filter"] = " or ".join(
|
||
[f"Parent_Key eq guid'{v['Ref_Key']}'" for v in parsed_data["value"]]
|
||
)
|
||
remote_url = (
|
||
"https://1c.svs-tech.pro/UNF/odata/standard.odata/Catalog_КатегорииНоменклатуры?"
|
||
+ "&".join([f"{p}={params[p]}" for p in params])
|
||
)
|
||
data = requests.get(
|
||
remote_url, headers={"Authorization": settings.ODATA_AUTH}
|
||
)
|
||
parsed_data = data.json()
|
||
return Response(parsed_data["value"])
|
||
except Exception as e:
|
||
logger.error(e)
|
||
return Response(status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|
||
|
||
@action(detail=False, methods=["get"], url_path=r"external/(?P<cat_id>[^/.]+)")
|
||
def get_remote_element(self, request, pk=None, cat_id=None):
|
||
try:
|
||
params = {
|
||
"$format": "json",
|
||
"$select": ",".join(["Description", "Ref_Key", "Parent_Key"]),
|
||
"$filter": f"Parent_Key eq guid'{cat_id}'",
|
||
}
|
||
remote_url = (
|
||
"https://1c.svs-tech.pro/UNF/odata/standard.odata/Catalog_Номенклатура?"
|
||
+ "&".join([f"{p}={params[p]}" for p in params])
|
||
)
|
||
data = requests.get(
|
||
remote_url, headers={"Authorization": settings.ODATA_AUTH}
|
||
)
|
||
parsed_data = data.json()
|
||
return Response(parsed_data["value"])
|
||
except Exception as e:
|
||
logger.error(e)
|
||
return Response(status=status.HTTP_500_INTERNAL_SERVER_ERROR)
|