Files
AI_A4000/video_worker/app/oss_client.py
2026-04-07 01:34:49 +08:00

62 lines
2.0 KiB
Python

from __future__ import annotations
from datetime import datetime, timezone
from pathlib import Path
import oss2
from app.settings import settings
class OSSUploader:
def __init__(self) -> None:
self.enabled = bool(settings.oss_enabled)
if not self.enabled:
self.bucket = None
return
if not all([
settings.oss_endpoint,
settings.oss_bucket,
settings.oss_access_key_id,
settings.oss_access_key_secret,
]):
raise RuntimeError("OSS is enabled but endpoint/bucket/ak/sk is not fully configured")
auth = oss2.Auth(settings.oss_access_key_id, settings.oss_access_key_secret)
self.bucket = oss2.Bucket(auth, settings.oss_endpoint, settings.oss_bucket)
@staticmethod
def _safe_name(name: str) -> str:
return name.replace("\\", "_").replace("/", "_")
def _key(self, dispatch_id: str, filename: str) -> str:
date_part = datetime.now(timezone.utc).strftime("%Y%m%d")
safe_file = self._safe_name(Path(filename).name)
return f"{settings.oss_prefix.strip('/')}/{date_part}/{dispatch_id}/{safe_file}"
def _public_url(self, key: str) -> str:
if settings.oss_public_base_url:
return f"{settings.oss_public_base_url.rstrip('/')}/{key}"
endpoint = settings.oss_endpoint.rstrip("/")
if endpoint.startswith("http://") or endpoint.startswith("https://"):
return f"{endpoint}/{key}"
return f"https://{endpoint}/{key}"
def upload_fileobj(self, dispatch_id: str, filename: str, fileobj) -> dict[str, str]:
if not self.enabled or self.bucket is None:
raise RuntimeError("OSS uploader is not enabled")
key = self._key(dispatch_id, filename)
fileobj.seek(0)
self.bucket.put_object(key, fileobj)
return {
"filename": Path(filename).name,
"object_key": key,
"url": self._public_url(key),
}
oss_uploader = OSSUploader()