from __future__ import annotations from datetime import datetime, timezone from pathlib import Path import oss2 from app.settings import settings class OSSUploader: def __init__(self) -> None: self.enabled = bool(settings.oss_enabled) if not self.enabled: self.bucket = None return if not all([ settings.oss_endpoint, settings.oss_bucket, settings.oss_access_key_id, settings.oss_access_key_secret, ]): raise RuntimeError("OSS is enabled but endpoint/bucket/ak/sk is not fully configured") auth = oss2.Auth(settings.oss_access_key_id, settings.oss_access_key_secret) self.bucket = oss2.Bucket(auth, settings.oss_endpoint, settings.oss_bucket) @staticmethod def _safe_name(name: str) -> str: return name.replace("\\", "_").replace("/", "_") def _key(self, dispatch_id: str, filename: str) -> str: date_part = datetime.now(timezone.utc).strftime("%Y%m%d") safe_file = self._safe_name(Path(filename).name) return f"{settings.oss_prefix.strip('/')}/{date_part}/{dispatch_id}/{safe_file}" def _public_url(self, key: str) -> str: if settings.oss_public_base_url: return f"{settings.oss_public_base_url.rstrip('/')}/{key}" endpoint = settings.oss_endpoint.rstrip("/") if endpoint.startswith("http://") or endpoint.startswith("https://"): return f"{endpoint}/{key}" return f"https://{endpoint}/{key}" def upload_fileobj(self, dispatch_id: str, filename: str, fileobj) -> dict[str, str]: if not self.enabled or self.bucket is None: raise RuntimeError("OSS uploader is not enabled") key = self._key(dispatch_id, filename) fileobj.seek(0) self.bucket.put_object(key, fileobj) return { "filename": Path(filename).name, "object_key": key, "url": self._public_url(key), } oss_uploader = OSSUploader()