import os import boto3 import time import requests from django.conf import settings from celery import shared_task from celery.utils.log import get_task_logger from api.celery import celery_app from .models import TmcField logger = get_task_logger(__name__) aws_access_key_id = os.environ.get("AWS_ACCESS") aws_secret_access_key = os.environ.get("AWS_SECRET") @celery_app.task def upload_file(file_id): if not TmcField.objects.filter(file_id=file_id).exists(): return obj = TmcField.objects.get(file_id=file_id) logger.info(f"start upload file {file_id}") file_url = requests.get(f"https://api.telegram.org/bot{settings.TGBOT['TOKEN']}/getFile?file_id={file_id}") file_json = file_url.json() response = f"https://api.telegram.org/file/bot{settings.TGBOT['TOKEN']}/{file_json['result']['file_path']}" logger.info(f"get tg url {response}") r = requests.get(response, stream=True) s3 = boto3.client( service_name="s3", endpoint_url="https://s3.ru-1.storage.selcloud.ru", aws_access_key_id=settings.SELECTEL['access'], aws_secret_access_key=settings.SELECTEL['secret'], ) s3.upload_fileobj(r.raw, 'inventorization', file_id) obj.image_aws_url = file_id