to_inventory/back/tgbot/tasks.py

38 lines
1.2 KiB
Python

import os
import boto3
import time
import requests
from django.conf import settings
from celery import shared_task
from celery.utils.log import get_task_logger
from api.celery import celery_app
from .models import TmcField
logger = get_task_logger(__name__)
aws_access_key_id = os.environ.get("AWS_ACCESS")
aws_secret_access_key = os.environ.get("AWS_SECRET")
@celery_app.task
def upload_file(file_id):
if not TmcField.objects.filter(file_id=file_id).exists():
return
obj = TmcField.objects.get(file_id=file_id)
logger.info(f"start upload file {file_id}")
file_url = requests.get(f"https://api.telegram.org/bot{settings.TGBOT['TOKEN']}/getFile?file_id={file_id}")
file_json = file_url.json()
response = f"https://api.telegram.org/file/bot{settings.TGBOT['TOKEN']}/{file_json['result']['file_path']}"
logger.info(f"get tg url {response}")
r = requests.get(response, stream=True)
s3 = boto3.client(
service_name="s3",
endpoint_url="https://s3.ru-1.storage.selcloud.ru",
aws_access_key_id=settings.SELECTEL['access'],
aws_secret_access_key=settings.SELECTEL['secret'],
)
s3.upload_fileobj(r.raw, 'inventorization', file_id)
obj.image_aws_url = file_id