""" Manual invoice upload: save file, optionally run AI vision to extract amount/date. """ import io from datetime import date, datetime from pathlib import Path from typing import Any, Dict, Tuple from fastapi import UploadFile from backend.app.services.ai_service import extract_invoice_metadata FINANCE_BASE = Path("data/finance") ALLOWED_IMAGE = {".jpg", ".jpeg", ".png", ".webp"} ALLOWED_PDF = {".pdf"} def _current_month() -> str: return datetime.utcnow().strftime("%Y-%m") def _pdf_first_page_to_image(pdf_bytes: bytes) -> Tuple[bytes, str] | None: """Render first page of PDF to PNG bytes. Returns (bytes, 'image/png') or None on error.""" try: import fitz doc = fitz.open(stream=pdf_bytes, filetype="pdf") if doc.page_count == 0: doc.close() return None page = doc[0] pix = page.get_pixmap(dpi=150) png_bytes = pix.tobytes("png") doc.close() return (png_bytes, "image/png") except Exception: return None async def process_invoice_upload( file: UploadFile, ) -> Tuple[str, str, str, float | None, date | None]: """ Save uploaded file to data/finance/{YYYY-MM}/manual/, run OCR for amount/date. Returns (file_name, file_path, month_str, amount, billing_date). """ month_str = _current_month() manual_dir = FINANCE_BASE / month_str / "manual" manual_dir.mkdir(parents=True, exist_ok=True) raw = await file.read() filename = file.filename or "upload" suf = Path(filename).suffix.lower() if suf in ALLOWED_IMAGE: image_bytes, mime = raw, (file.content_type or "image/jpeg") if "png" in (suf or ""): mime = "image/png" amount, date_str = await extract_invoice_metadata(image_bytes, mime) elif suf in ALLOWED_PDF: image_result = _pdf_first_page_to_image(raw) if image_result: image_bytes, mime = image_result amount, date_str = await extract_invoice_metadata(image_bytes, mime) else: amount, date_str = None, None # Save original PDF else: amount, date_str = None, None # Unique filename dest = manual_dir / filename counter = 1 while dest.exists(): dest = manual_dir / f"{dest.stem}_{counter}{dest.suffix}" counter += 1 dest.write_bytes(raw) file_path = str(dest) file_name = dest.name billing_date = None if date_str: try: billing_date = date.fromisoformat(date_str) except ValueError: pass if billing_date is None: billing_date = date.today() return (file_name, file_path, month_str, amount, billing_date)