734 lines
26 KiB
Python
734 lines
26 KiB
Python
import asyncio
|
||
import email
|
||
import hashlib
|
||
import imaplib
|
||
import logging
|
||
import os
|
||
import re
|
||
import sqlite3
|
||
import ssl
|
||
from datetime import date, datetime, timedelta
|
||
from email.header import decode_header
|
||
from pathlib import Path
|
||
from typing import Any, Dict, List, Tuple
|
||
|
||
# Ensure IMAP ID command is recognised by imaplib so we can spoof a
|
||
# desktop mail client (Foxmail/Outlook) for providers like NetEase/163.
|
||
imaplib.Commands["ID"] = ("NONAUTH", "AUTH", "SELECTED")
|
||
|
||
from sqlalchemy.orm import Session
|
||
|
||
from backend.app.db import SessionLocal
|
||
from backend.app.models import FinanceRecord
|
||
|
||
|
||
FINANCE_BASE_DIR = Path("data/finance")
|
||
SYNC_DB_PATH = Path("data/finance/sync_history.db")
|
||
|
||
# Folder names for classification (invoices, receipts, statements)
|
||
INVOICES_DIR = "invoices"
|
||
RECEIPTS_DIR = "receipts"
|
||
STATEMENTS_DIR = "statements"
|
||
|
||
|
||
def _decode_header_value(value: str | None) -> str:
|
||
if not value:
|
||
return ""
|
||
parts = decode_header(value)
|
||
decoded = ""
|
||
for text, enc in parts:
|
||
if isinstance(text, bytes):
|
||
decoded += text.decode(enc or "utf-8", errors="ignore")
|
||
else:
|
||
decoded += text
|
||
return decoded
|
||
|
||
|
||
def _classify_type(subject: str, filename: str) -> str:
|
||
"""
|
||
Classify finance document type. Returns: invoices, receipts, statements, others.
|
||
Maps to folders: invoices/, receipts/, statements/.
|
||
"""
|
||
text = f"{subject} {filename}".lower()
|
||
# 发票 / 开票类
|
||
if any(k in text for k in ["发票", "开票", "票据", "invoice", "fapiao"]):
|
||
return "invoices"
|
||
# 回执
|
||
if any(k in text for k in ["回执", "签收单", "receipt"]):
|
||
return "receipts"
|
||
# 银行流水 / 账户明细 / 对公活期等
|
||
if any(
|
||
k in text
|
||
for k in [
|
||
"流水",
|
||
"活期",
|
||
"活期明细",
|
||
"对公",
|
||
"明细",
|
||
"回单",
|
||
"bank",
|
||
"对账单",
|
||
"statement",
|
||
]
|
||
):
|
||
return "statements"
|
||
return "others"
|
||
|
||
|
||
def _ensure_month_dir(month_str: str, doc_type: str) -> Path:
|
||
month_dir = FINANCE_BASE_DIR / month_str / doc_type
|
||
month_dir.mkdir(parents=True, exist_ok=True)
|
||
return month_dir
|
||
|
||
|
||
def _parse_email_date(msg: email.message.Message) -> datetime:
|
||
date_tuple = email.utils.parsedate_tz(msg.get("Date"))
|
||
if date_tuple:
|
||
dt = datetime.fromtimestamp(email.utils.mktime_tz(date_tuple))
|
||
else:
|
||
dt = datetime.utcnow()
|
||
return dt
|
||
|
||
|
||
def _run_invoice_ocr_sync(file_path: str, mime: str, raw_bytes: bytes) -> Tuple[float | None, str | None]:
|
||
"""Run extract_invoice_metadata from a sync context (new event loop). Handles PDF via first page image."""
|
||
from backend.app.services.ai_service import extract_invoice_metadata
|
||
from backend.app.services.invoice_upload import _pdf_first_page_to_image
|
||
|
||
if "pdf" in (mime or "").lower() or Path(file_path).suffix.lower() == ".pdf":
|
||
img_result = _pdf_first_page_to_image(raw_bytes)
|
||
if img_result:
|
||
image_bytes, img_mime = img_result
|
||
raw_bytes, mime = image_bytes, img_mime
|
||
# else keep raw_bytes and try anyway (may fail)
|
||
loop = asyncio.new_event_loop()
|
||
asyncio.set_event_loop(loop)
|
||
try:
|
||
return loop.run_until_complete(extract_invoice_metadata(raw_bytes, mime))
|
||
finally:
|
||
loop.close()
|
||
|
||
|
||
def _extract_text_for_tagging(file_path: str, mime: str, raw_bytes: bytes) -> str:
|
||
"""
|
||
Extract best-effort text from PDF/image/xlsx for tagging.
|
||
- PDF: extract text via fitz; fallback to first page OCR image (handled elsewhere if needed)
|
||
- Image: no local OCR here; return empty and let AI decide (optional)
|
||
- XLSX: not parsed currently
|
||
"""
|
||
p = Path(file_path)
|
||
suf = p.suffix.lower()
|
||
if suf == ".pdf" or "pdf" in (mime or "").lower():
|
||
try:
|
||
import fitz # PyMuPDF
|
||
doc = fitz.open(stream=raw_bytes, filetype="pdf")
|
||
texts: list[str] = []
|
||
for i in range(min(5, doc.page_count)):
|
||
texts.append(doc.load_page(i).get_text("text") or "")
|
||
doc.close()
|
||
return "\n".join(texts).strip()
|
||
except Exception:
|
||
return ""
|
||
return ""
|
||
|
||
|
||
def _rename_invoice_file(
|
||
file_path: str,
|
||
amount: float | None,
|
||
billing_date: date | None,
|
||
) -> Tuple[str, str]:
|
||
"""
|
||
Rename invoice file to YYYYMMDD_金额_原文件名.
|
||
Returns (new_file_name, new_file_path).
|
||
"""
|
||
path = Path(file_path)
|
||
if not path.exists():
|
||
return (path.name, file_path)
|
||
date_str = (billing_date or date.today()).strftime("%Y%m%d")
|
||
amount_str = f"{amount:.2f}" if amount is not None else "0.00"
|
||
# Sanitize original name: take stem, limit length
|
||
orig_stem = path.stem[: 80] if len(path.stem) > 80 else path.stem
|
||
suffix = path.suffix
|
||
new_name = f"{date_str}_{amount_str}_{orig_stem}{suffix}"
|
||
new_path = path.parent / new_name
|
||
counter = 1
|
||
while new_path.exists():
|
||
new_path = path.parent / f"{date_str}_{amount_str}_{orig_stem}_{counter}{suffix}"
|
||
counter += 1
|
||
path.rename(new_path)
|
||
return (new_path.name, str(new_path))
|
||
|
||
|
||
def _ensure_sync_history_table(conn: sqlite3.Connection) -> None:
|
||
conn.execute(
|
||
"""
|
||
CREATE TABLE IF NOT EXISTS attachment_history (
|
||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||
message_id TEXT,
|
||
file_hash TEXT NOT NULL,
|
||
month TEXT,
|
||
doc_type TEXT,
|
||
file_name TEXT,
|
||
file_path TEXT,
|
||
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
|
||
UNIQUE(message_id, file_hash)
|
||
)
|
||
"""
|
||
)
|
||
conn.commit()
|
||
|
||
|
||
def _has_sync_history() -> bool:
|
||
"""是否有过同步记录;无记录视为首次同步,需拉全量;有记录则只拉增量(UNSEEN)。"""
|
||
if not SYNC_DB_PATH.exists():
|
||
return False
|
||
try:
|
||
conn = sqlite3.connect(SYNC_DB_PATH)
|
||
try:
|
||
cur = conn.execute("SELECT 1 FROM attachment_history LIMIT 1")
|
||
return cur.fetchone() is not None
|
||
finally:
|
||
conn.close()
|
||
except Exception:
|
||
return False
|
||
|
||
|
||
def _save_attachment(
|
||
msg: email.message.Message,
|
||
month_str: str,
|
||
allowed_doc_types: set[str] | None = None,
|
||
) -> List[Tuple[str, str, str, bytes, str]]:
|
||
"""
|
||
Save PDF/image attachments.
|
||
Returns list of (file_name, file_path, mime, raw_bytes, doc_type).
|
||
raw_bytes kept for invoice OCR when doc_type == invoices.
|
||
|
||
同时使用 data/finance/sync_history.db 做增量去重:
|
||
- 以 (message_id, MD5(content)) 为唯一键,避免重复保存相同附件。
|
||
"""
|
||
saved: List[Tuple[str, str, str, bytes, str]] = []
|
||
|
||
msg_id = msg.get("Message-ID") or ""
|
||
subject = _decode_header_value(msg.get("Subject"))
|
||
|
||
SYNC_DB_PATH.parent.mkdir(parents=True, exist_ok=True)
|
||
conn = sqlite3.connect(SYNC_DB_PATH)
|
||
try:
|
||
_ensure_sync_history_table(conn)
|
||
|
||
for part in msg.walk():
|
||
# 许多邮件附件会以 inline 或缺失 Content-Disposition 的形式出现,
|
||
# 只要存在 filename 且扩展名符合,就视为可下载附件。
|
||
content_disposition = (part.get("Content-Disposition", "") or "").lower()
|
||
|
||
filename = part.get_filename()
|
||
filename = _decode_header_value(filename)
|
||
if not filename:
|
||
continue
|
||
if content_disposition and ("attachment" not in content_disposition and "inline" not in content_disposition):
|
||
# 明确的非附件 disposition,跳过
|
||
continue
|
||
|
||
ext = Path(filename).suffix.lower()
|
||
if ext not in (".pdf", ".jpg", ".jpeg", ".png", ".webp", ".xlsx", ".xls"):
|
||
continue
|
||
|
||
maintype = part.get_content_maintype()
|
||
if maintype not in ("application", "image"):
|
||
continue
|
||
|
||
data = part.get_payload(decode=True)
|
||
if not data:
|
||
continue
|
||
|
||
# 分类:基于主题 + 文件名
|
||
doc_type = _classify_type(subject, filename)
|
||
if allowed_doc_types is not None and doc_type not in allowed_doc_types:
|
||
continue
|
||
base_dir = _ensure_month_dir(month_str, doc_type)
|
||
|
||
# 增量去重:根据 (message_id, md5) 判断是否已同步过
|
||
file_hash = hashlib.md5(data).hexdigest() # nosec - content hash only
|
||
cur = conn.execute(
|
||
"SELECT 1 FROM attachment_history WHERE message_id = ? AND file_hash = ?",
|
||
(msg_id, file_hash),
|
||
)
|
||
if cur.fetchone():
|
||
continue
|
||
|
||
mime = part.get_content_type() or "application/octet-stream"
|
||
file_path = base_dir / filename
|
||
counter = 1
|
||
while file_path.exists():
|
||
stem, suffix = file_path.stem, file_path.suffix
|
||
file_path = base_dir / f"{stem}_{counter}{suffix}"
|
||
counter += 1
|
||
|
||
file_path.write_bytes(data)
|
||
|
||
conn.execute(
|
||
"""
|
||
INSERT OR IGNORE INTO attachment_history
|
||
(message_id, file_hash, month, doc_type, file_name, file_path)
|
||
VALUES (?, ?, ?, ?, ?, ?)
|
||
""",
|
||
(msg_id, file_hash, month_str, doc_type, file_path.name, str(file_path)),
|
||
)
|
||
|
||
saved.append((file_path.name, str(file_path), mime, data, doc_type))
|
||
|
||
finally:
|
||
conn.commit()
|
||
conn.close()
|
||
|
||
return saved
|
||
|
||
|
||
def _decode_imap_utf7(s: str | bytes) -> str:
|
||
"""Decode IMAP4 UTF-7 mailbox name (RFC 3501). Returns decoded string."""
|
||
if isinstance(s, bytes):
|
||
s = s.decode("ascii", errors="replace")
|
||
if "&" not in s:
|
||
return s
|
||
parts = s.split("&")
|
||
out = [parts[0]]
|
||
for i in range(1, len(parts)):
|
||
chunk = parts[i]
|
||
if "-" in chunk:
|
||
u, rest = chunk.split("-", 1)
|
||
if u == "":
|
||
out.append("&")
|
||
else:
|
||
try:
|
||
# IMAP UTF-7: &BASE64- where BASE64 is modified (,+ instead of /,=)
|
||
pad = (4 - len(u) % 4) % 4
|
||
b = (u + "=" * pad).translate(str.maketrans(",+", "/="))
|
||
decoded = __import__("base64").b64decode(b).decode("utf-16-be")
|
||
out.append(decoded)
|
||
except Exception:
|
||
out.append("&" + chunk)
|
||
out.append(rest)
|
||
else:
|
||
out.append("&" + chunk)
|
||
return "".join(out)
|
||
|
||
|
||
def _parse_list_response(data: List[bytes]) -> List[Tuple[str, str]]:
|
||
"""Parse imap.list() response to [(raw_name, decoded_name), ...]. Format: (flags) \"delim\" \"mailbox\"."""
|
||
import shlex
|
||
result: List[Tuple[str, str]] = []
|
||
for line in data:
|
||
if not isinstance(line, bytes):
|
||
continue
|
||
try:
|
||
line_str = line.decode("ascii", errors="replace")
|
||
except Exception:
|
||
continue
|
||
try:
|
||
parts = shlex.split(line_str)
|
||
except ValueError:
|
||
continue
|
||
if not parts:
|
||
continue
|
||
# Mailbox name is the last part (RFC 3501 LIST: (attrs) delim name)
|
||
raw = parts[-1]
|
||
decoded = _decode_imap_utf7(raw)
|
||
result.append((raw, decoded))
|
||
return result
|
||
|
||
|
||
def _list_mailboxes(imap: imaplib.IMAP4_SSL) -> List[Tuple[str, str]]:
|
||
"""List all mailboxes. Returns [(raw_name, decoded_name), ...]."""
|
||
status, data = imap.list()
|
||
if status != "OK" or not data:
|
||
return []
|
||
return _parse_list_response(data)
|
||
|
||
|
||
def list_mailboxes_for_config(host: str, port: int, user: str, password: str) -> List[Tuple[str, str]]:
|
||
"""Connect and list all mailboxes (for dropdown). Returns [(raw_name, decoded_name), ...]."""
|
||
with imaplib.IMAP4_SSL(host, int(port)) as imap:
|
||
imap.login(user, password)
|
||
return _list_mailboxes(imap)
|
||
|
||
|
||
def _select_mailbox(imap: imaplib.IMAP4_SSL, mailbox: str) -> bool:
|
||
"""
|
||
Robust mailbox selection with deep discovery scan.
|
||
|
||
Strategy:
|
||
1. LIST all folders, log raw lines for debugging.
|
||
2. Look for entry containing '\\Inbox' flag; if found, SELECT that folder.
|
||
3. Try standard candidates: user-configured name / INBOX / common UTF-7 收件箱编码.
|
||
4. As last resort, attempt SELECT on every listed folder and log which succeed/fail.
|
||
"""
|
||
logger = logging.getLogger(__name__)
|
||
|
||
name = (mailbox or "INBOX").strip() or "INBOX"
|
||
|
||
# 1) Discovery scan: list all folders and log raw entries
|
||
try:
|
||
status, data = imap.list()
|
||
if status != "OK" or not data:
|
||
logger.warning("IMAP LIST returned no data or non-OK status: %s", status)
|
||
data = []
|
||
except Exception as exc:
|
||
logger.error("IMAP LIST failed: %s", exc)
|
||
data = []
|
||
|
||
logger.info("IMAP Discovery Scan: listing all folders for mailbox=%s", name)
|
||
for raw in data:
|
||
logger.info("IMAP FOLDER RAW: %r", raw)
|
||
|
||
# 2) 优先按 \\Inbox 属性查找“真正的收件箱”
|
||
inbox_candidates: list[str] = []
|
||
for raw in data:
|
||
line = raw.decode("utf-8", errors="ignore") if isinstance(raw, bytes) else str(raw)
|
||
if "\\Inbox" not in line:
|
||
continue
|
||
m = re.search(r'"([^"]+)"\s*$', line)
|
||
if not m:
|
||
continue
|
||
folder_name = m.group(1)
|
||
inbox_candidates.append(folder_name)
|
||
|
||
# 3) 补充常规候选:配置名 / INBOX / 常见 UTF-7 收件箱编码
|
||
primary_names = [name, "INBOX"]
|
||
utf7_names = ["&XfJT0ZTx-"]
|
||
for nm in primary_names + utf7_names:
|
||
if nm not in inbox_candidates:
|
||
inbox_candidates.append(nm)
|
||
|
||
logger.info("IMAP Inbox candidate list (ordered): %r", inbox_candidates)
|
||
|
||
# 4) 依次尝试候选收件箱
|
||
for candidate in inbox_candidates:
|
||
for readonly in (False, True):
|
||
try:
|
||
status, _ = imap.select(candidate, readonly=readonly)
|
||
logger.info(
|
||
"IMAP SELECT candidate=%r readonly=%s -> %s", candidate, readonly, status
|
||
)
|
||
if status == "OK":
|
||
return True
|
||
except Exception as exc:
|
||
logger.warning(
|
||
"IMAP SELECT failed for candidate=%r readonly=%s: %s",
|
||
candidate,
|
||
readonly,
|
||
exc,
|
||
)
|
||
|
||
# 5) 最后手段:尝试 LIST 返回的每一个文件夹
|
||
logger.info("IMAP Fallback: trying SELECT on every listed folder...")
|
||
for raw in data:
|
||
line = raw.decode("utf-8", errors="ignore") if isinstance(raw, bytes) else str(raw)
|
||
m = re.search(r'"([^"]+)"\s*$', line)
|
||
if not m:
|
||
continue
|
||
folder_name = m.group(1)
|
||
for readonly in (False, True):
|
||
try:
|
||
status, _ = imap.select(folder_name, readonly=readonly)
|
||
logger.info(
|
||
"IMAP SELECT fallback folder=%r readonly=%s -> %s",
|
||
folder_name,
|
||
readonly,
|
||
status,
|
||
)
|
||
if status == "OK":
|
||
return True
|
||
except Exception as exc:
|
||
logger.warning(
|
||
"IMAP SELECT fallback failed for folder=%r readonly=%s: %s",
|
||
folder_name,
|
||
readonly,
|
||
exc,
|
||
)
|
||
|
||
logger.error("IMAP: unable to SELECT any inbox-like folder for mailbox=%s", name)
|
||
return False
|
||
|
||
|
||
def _imap_date(d: date) -> str:
|
||
# IMAP date format: 16-Mar-2026 (English month)
|
||
import calendar
|
||
return f"{d.day:02d}-{calendar.month_abbr[d.month]}-{d.year}"
|
||
|
||
|
||
def _pick_latest_msg_id(imap: imaplib.IMAP4_SSL, msg_ids: List[bytes]) -> bytes | None:
|
||
"""从一批 msg_id 中按 INTERNALDATE 选择最新的一封。"""
|
||
latest_id: bytes | None = None
|
||
latest_ts: float = -1.0
|
||
for mid in msg_ids:
|
||
try:
|
||
typ, data = imap.fetch(mid, "(INTERNALDATE)")
|
||
if typ != "OK" or not data or not data[0]:
|
||
continue
|
||
# imaplib.Internaldate2tuple expects a bytes response line
|
||
raw = data[0]
|
||
if isinstance(raw, tuple):
|
||
raw = raw[0]
|
||
if not isinstance(raw, (bytes, bytearray)):
|
||
raw = str(raw).encode("utf-8", errors="ignore")
|
||
t = imaplib.Internaldate2tuple(raw)
|
||
if not t:
|
||
continue
|
||
import time
|
||
ts = time.mktime(t)
|
||
if ts > latest_ts:
|
||
latest_ts = ts
|
||
latest_id = mid
|
||
except Exception:
|
||
continue
|
||
return latest_id
|
||
|
||
|
||
def _sync_one_account(
|
||
config: Dict[str, Any],
|
||
db: Session,
|
||
results: List[Dict[str, Any]],
|
||
*,
|
||
mode: str = "incremental",
|
||
start_date: date | None = None,
|
||
end_date: date | None = None,
|
||
doc_types: list[str] | None = None,
|
||
) -> None:
|
||
allowed: set[str] | None = None
|
||
if doc_types:
|
||
allowed = {d.strip().lower() for d in doc_types if d and d.strip()}
|
||
allowed = {d for d in allowed if d in ("invoices", "receipts", "statements")}
|
||
if not allowed:
|
||
allowed = None
|
||
host = config.get("host")
|
||
user = config.get("user")
|
||
password = config.get("password")
|
||
port = int(config.get("port", 993))
|
||
mailbox = (config.get("mailbox") or "INBOX").strip() or "INBOX"
|
||
|
||
if not all([host, user, password]):
|
||
return
|
||
|
||
# Use strict TLS context for modern protocols (TLS 1.2+)
|
||
tls_context = ssl.create_default_context()
|
||
|
||
with imaplib.IMAP4_SSL(host, port, ssl_context=tls_context) as imap:
|
||
# Enable low-level IMAP debug output to backend logs to help diagnose
|
||
# handshake / protocol / mailbox selection issues with specific providers.
|
||
imap.debug = 4
|
||
imap.login(user, password)
|
||
# NetEase / 163 等会对未知客户端静默限制 SELECT,这里通过 ID 命令伪装为常见桌面客户端。
|
||
try:
|
||
logger = logging.getLogger(__name__)
|
||
id_str = (
|
||
'("name" "Foxmail" '
|
||
'"version" "7.2.25.170" '
|
||
'"vendor" "Tencent" '
|
||
'"os" "Windows" '
|
||
'"os-version" "10.0")'
|
||
)
|
||
logger.info("IMAP sending Foxmail-style ID: %s", id_str)
|
||
# Use low-level command so it works across Python versions.
|
||
typ, dat = imap._command("ID", id_str) # type: ignore[attr-defined]
|
||
logger.info("IMAP ID command result: %s %r", typ, dat)
|
||
except Exception as exc:
|
||
# ID 失败不应阻断登录,只记录日志,方便后续排查。
|
||
logging.getLogger(__name__).warning("IMAP ID command failed: %s", exc)
|
||
if not _select_mailbox(imap, mailbox):
|
||
raise RuntimeError(
|
||
f"无法选择邮箱「{mailbox}」,请检查该账户的 Mailbox 配置(如 163 使用 INBOX)"
|
||
)
|
||
|
||
# 支持:
|
||
# - mode=incremental: 首次全量,否则 UNSEEN
|
||
# - mode=all: 全量(可加时间范围)
|
||
# - mode=latest: 仅最新一封(可加时间范围)
|
||
mode = (mode or "incremental").strip().lower()
|
||
if mode not in ("incremental", "all", "latest"):
|
||
mode = "incremental"
|
||
|
||
is_first_sync = not _has_sync_history()
|
||
base_criterion = "ALL"
|
||
if mode == "incremental":
|
||
base_criterion = "ALL" if is_first_sync else "UNSEEN"
|
||
elif mode == "all":
|
||
base_criterion = "ALL"
|
||
elif mode == "latest":
|
||
base_criterion = "ALL"
|
||
|
||
criteria: List[str] = [base_criterion]
|
||
if start_date:
|
||
criteria += ["SINCE", _imap_date(start_date)]
|
||
if end_date:
|
||
# BEFORE is exclusive; add one day to make end_date inclusive
|
||
criteria += ["BEFORE", _imap_date(end_date + timedelta(days=1))]
|
||
|
||
logging.getLogger(__name__).info(
|
||
"Finance sync: mode=%s criterion=%s range=%s~%s",
|
||
mode,
|
||
base_criterion,
|
||
start_date,
|
||
end_date,
|
||
)
|
||
|
||
status, data = imap.search(None, *criteria)
|
||
if status != "OK":
|
||
return
|
||
|
||
id_list: List[bytes] = data[0].split() if data and data[0] else []
|
||
logging.getLogger(__name__).info(
|
||
"Finance sync: matched messages=%d (mode=%s)", len(id_list), mode
|
||
)
|
||
if not id_list:
|
||
return
|
||
|
||
if mode == "latest":
|
||
latest = _pick_latest_msg_id(imap, id_list)
|
||
id_list = [latest] if latest else []
|
||
|
||
for msg_id in id_list:
|
||
status, msg_data = imap.fetch(msg_id, "(RFC822)")
|
||
if status != "OK":
|
||
continue
|
||
|
||
raw_email = msg_data[0][1]
|
||
msg = email.message_from_bytes(raw_email)
|
||
dt = _parse_email_date(msg)
|
||
month_str = dt.strftime("%Y-%m")
|
||
|
||
saved = _save_attachment(msg, month_str, allowed_doc_types=allowed)
|
||
for file_name, file_path, mime, raw_bytes, doc_type in saved:
|
||
final_name = file_name
|
||
final_path = file_path
|
||
amount = None
|
||
billing_date = None
|
||
|
||
if doc_type == "invoices":
|
||
amount, date_str = _run_invoice_ocr_sync(file_path, mime, raw_bytes)
|
||
if date_str:
|
||
try:
|
||
billing_date = date.fromisoformat(date_str[:10])
|
||
except ValueError:
|
||
billing_date = date.today()
|
||
else:
|
||
billing_date = date.today()
|
||
final_name, final_path = _rename_invoice_file(
|
||
file_path, amount, billing_date
|
||
)
|
||
|
||
record = FinanceRecord(
|
||
month=month_str,
|
||
type=doc_type,
|
||
file_name=final_name,
|
||
file_path=final_path,
|
||
tags=None,
|
||
meta_json=None,
|
||
amount=amount,
|
||
billing_date=billing_date,
|
||
)
|
||
db.add(record)
|
||
db.flush()
|
||
|
||
# 自动识别打标签(同步后自动跑)
|
||
try:
|
||
from backend.app.services.ai_service import extract_finance_tags
|
||
content_text = _extract_text_for_tagging(final_path, mime, raw_bytes)
|
||
tags, meta = asyncio.run(extract_finance_tags(content_text, doc_type, final_name)) # type: ignore[arg-type]
|
||
if tags:
|
||
record.tags = ",".join(tags)
|
||
if meta:
|
||
import json as _json
|
||
record.meta_json = _json.dumps(meta, ensure_ascii=False)
|
||
db.flush()
|
||
except Exception:
|
||
pass
|
||
|
||
results.append({
|
||
"id": record.id,
|
||
"month": record.month,
|
||
"type": record.type,
|
||
"file_name": record.file_name,
|
||
"file_path": record.file_path,
|
||
})
|
||
|
||
imap.store(msg_id, "+FLAGS", "\\Seen \\Flagged")
|
||
|
||
|
||
async def sync_finance_emails(
|
||
*,
|
||
mode: str = "incremental",
|
||
start_date: date | None = None,
|
||
end_date: date | None = None,
|
||
doc_types: list[str] | None = None,
|
||
) -> List[Dict[str, Any]]:
|
||
"""
|
||
Sync from all active email configs (data/email_configs.json).
|
||
Falls back to env vars if no configs. Classifies into invoices/, receipts/, statements/.
|
||
Invoices are renamed to YYYYMMDD_金额_原文件名 using OCR.
|
||
"""
|
||
|
||
def _sync() -> List[Dict[str, Any]]:
|
||
from backend.app.routers.email_configs import get_email_configs_for_sync
|
||
|
||
configs = get_email_configs_for_sync()
|
||
if not configs:
|
||
raise RuntimeError("未配置邮箱。请在 设置 → 邮箱账户 中添加,或配置 IMAP_* 环境变量。")
|
||
|
||
results: List[Dict[str, Any]] = []
|
||
errors: List[str] = []
|
||
db = SessionLocal()
|
||
try:
|
||
for config in configs:
|
||
try:
|
||
_sync_one_account(
|
||
config,
|
||
db,
|
||
results,
|
||
mode=mode,
|
||
start_date=start_date,
|
||
end_date=end_date,
|
||
doc_types=doc_types,
|
||
)
|
||
except Exception as e:
|
||
# 不让单个账户的异常中断全部同步,记录错误并继续其他账户。
|
||
user = config.get("user", "") or config.get("id", "")
|
||
errors.append(f"同步账户 {user} 失败: {e}")
|
||
db.commit()
|
||
finally:
|
||
db.close()
|
||
|
||
if not results and errors:
|
||
# 所有账户都失败了,整体报错,前端可显示详细原因。
|
||
raise RuntimeError("; ".join(errors))
|
||
|
||
return results
|
||
|
||
return await asyncio.to_thread(_sync)
|
||
|
||
|
||
async def create_monthly_zip(month_str: str) -> str:
|
||
"""
|
||
Zip the finance folder for a given month (YYYY-MM).
|
||
Preserves folder structure (invoices/, receipts/, statements/, manual/) inside the zip.
|
||
"""
|
||
import zipfile
|
||
|
||
def _zip() -> str:
|
||
month_dir = FINANCE_BASE_DIR / month_str
|
||
if not month_dir.exists():
|
||
raise FileNotFoundError(f"Finance directory for {month_str} not found.")
|
||
|
||
FINANCE_BASE_DIR.mkdir(parents=True, exist_ok=True)
|
||
zip_path = FINANCE_BASE_DIR / f"{month_str}.zip"
|
||
|
||
with zipfile.ZipFile(zip_path, "w", compression=zipfile.ZIP_DEFLATED) as zf:
|
||
for root, _, files in os.walk(month_dir):
|
||
for file in files:
|
||
full_path = Path(root) / file
|
||
rel_path = full_path.relative_to(FINANCE_BASE_DIR)
|
||
zf.write(full_path, arcname=rel_path)
|
||
|
||
return str(zip_path)
|
||
|
||
return await asyncio.to_thread(_zip)
|