import asyncio import email import imaplib import os from datetime import datetime from email.header import decode_header from pathlib import Path from typing import Any, Dict, List, Tuple from backend.app.db import SessionLocal from backend.app.models import FinanceRecord FINANCE_BASE_DIR = Path("data/finance") def _decode_header_value(value: str | None) -> str: if not value: return "" parts = decode_header(value) decoded = "" for text, enc in parts: if isinstance(text, bytes): decoded += text.decode(enc or "utf-8", errors="ignore") else: decoded += text return decoded def _classify_type(subject: str) -> str: """ Classify finance document type based on subject keywords. """ subject_lower = subject.lower() # 发票 / 开票类 if any(k in subject for k in ["发票", "开票", "票据", "invoice"]): return "invoices" # 银行流水 / 账户明细 / 对公活期等 if any( k in subject for k in [ "流水", "活期", "活期明细", "对公", "明细", "回单", "bank", "对账单", "statement", ] ): return "bank_records" if any(k in subject for k in ["回执", "receipt"]): return "receipts" return "others" def _ensure_month_dir(month_str: str, doc_type: str) -> Path: month_dir = FINANCE_BASE_DIR / month_str / doc_type month_dir.mkdir(parents=True, exist_ok=True) return month_dir def _parse_email_date(msg: email.message.Message) -> datetime: date_tuple = email.utils.parsedate_tz(msg.get("Date")) if date_tuple: dt = datetime.fromtimestamp(email.utils.mktime_tz(date_tuple)) else: dt = datetime.utcnow() return dt def _save_attachment( msg: email.message.Message, month_str: str, doc_type: str, ) -> List[Tuple[str, str]]: """ Save PDF/image attachments and return list of (file_name, file_path). """ saved: List[Tuple[str, str]] = [] base_dir = _ensure_month_dir(month_str, doc_type) for part in msg.walk(): content_disposition = part.get("Content-Disposition", "") if "attachment" not in content_disposition: continue filename = part.get_filename() filename = _decode_header_value(filename) if not filename: continue content_type = part.get_content_type() maintype = part.get_content_maintype() # Accept pdf and common images if maintype not in ("application", "image"): continue data = part.get_payload(decode=True) if not data: continue file_path = base_dir / filename # Ensure unique filename counter = 1 while file_path.exists(): stem = file_path.stem suffix = file_path.suffix file_path = base_dir / f"{stem}_{counter}{suffix}" counter += 1 with open(file_path, "wb") as f: f.write(data) saved.append((filename, str(file_path))) return saved async def sync_finance_emails() -> List[Dict[str, Any]]: """ Connect to IMAP, fetch unread finance-related emails, download attachments, save to filesystem and record FinanceRecord entries. """ def _sync() -> List[Dict[str, Any]]: host = os.getenv("IMAP_HOST") user = os.getenv("IMAP_USER") password = os.getenv("IMAP_PASSWORD") port = int(os.getenv("IMAP_PORT", "993")) mailbox = os.getenv("IMAP_MAILBOX", "INBOX") if not all([host, user, password]): raise RuntimeError("IMAP_HOST, IMAP_USER, IMAP_PASSWORD must be set.") results: List[Dict[str, Any]] = [] with imaplib.IMAP4_SSL(host, port) as imap: imap.login(user, password) imap.select(mailbox) # Search for UNSEEN emails with finance related keywords in subject. # Note: IMAP SEARCH is limited; here we search UNSEEN first then filter in Python. status, data = imap.search(None, "UNSEEN") if status != "OK": return results id_list = data[0].split() db = SessionLocal() try: for msg_id in id_list: status, msg_data = imap.fetch(msg_id, "(RFC822)") if status != "OK": continue raw_email = msg_data[0][1] msg = email.message_from_bytes(raw_email) subject = _decode_header_value(msg.get("Subject")) doc_type = _classify_type(subject) # Filter by keywords first if doc_type == "others": continue dt = _parse_email_date(msg) month_str = dt.strftime("%Y-%m") saved_files = _save_attachment(msg, month_str, doc_type) for file_name, file_path in saved_files: record = FinanceRecord( month=month_str, type=doc_type, file_name=file_name, file_path=file_path, ) # NOTE: created_at defaults at DB layer db.add(record) db.flush() results.append( { "id": record.id, "month": record.month, "type": record.type, "file_name": record.file_name, "file_path": record.file_path, } ) # Mark email as seen and flagged to avoid re-processing imap.store(msg_id, "+FLAGS", "\\Seen \\Flagged") db.commit() finally: db.close() return results return await asyncio.to_thread(_sync) async def create_monthly_zip(month_str: str) -> str: """ Zip the finance folder for a given month (YYYY-MM) and return the zip path. """ import zipfile def _zip() -> str: month_dir = FINANCE_BASE_DIR / month_str if not month_dir.exists(): raise FileNotFoundError(f"Finance directory for {month_str} not found.") FINANCE_BASE_DIR.mkdir(parents=True, exist_ok=True) zip_path = FINANCE_BASE_DIR / f"{month_str}.zip" with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as zf: for root, _, files in os.walk(month_dir): for file in files: full_path = Path(root) / file rel_path = full_path.relative_to(FINANCE_BASE_DIR) zf.write(full_path, arcname=rel_path) return str(zip_path) return await asyncio.to_thread(_zip)