#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ 本地邮箱同步测试脚本(不依赖项目代码) - 支持多个 IMAP 账户 - 每个账户单独测试登录、选择邮箱夹、列出未读邮件主题 - 结束时打印哪些账户成功、哪些失败(以及失败原因) 用来在对接「财务归档 → 同步」之前,本地先把邮箱配置调通。 """ import imaplib import email from email.header import decode_header from dataclasses import dataclass from typing import List, Tuple, Optional # ===== 1. 在这里配置你的邮箱账户 ===== # 163 示例:IMAP 服务器 imap.163.com,端口 993,密码为「IMAP/SMTP 授权码」 ACCOUNTS = [ { "name": "163 财务邮箱", "host": "imap.163.com", "port": 993, "user": "danielghost@163.com", "password": "TZjkMANWyYEDXui7", "mailbox": "INBOX", }, ] # ===== 2. 工具函数 ===== def _decode_header_value(value: Optional[str]) -> str: if not value: return "" parts = decode_header(value) decoded = "" for text, enc in parts: if isinstance(text, bytes): decoded += text.decode(enc or "utf-8", errors="ignore") else: decoded += text return decoded def _list_mailboxes(imap: imaplib.IMAP4_SSL) -> List[Tuple[str, str]]: """列出所有邮箱夹,返回 [(raw_name, decoded_name)]""" status, data = imap.list() if status != "OK" or not data: return [] result: List[Tuple[str, str]] = [] for line in data: if not isinstance(line, bytes): continue try: line_str = line.decode("ascii", errors="replace") except Exception: continue # 典型格式:b'(\\HasNoChildren) "/" "&UXZO1mWHTvZZOQ-"' parts = line_str.split(" ") if not parts: continue raw = parts[-1].strip('"') # 简单处理 UTF-7,足够看中文「收件箱」 try: decoded = raw.encode("latin1").decode("utf-7") except Exception: decoded = raw result.append((raw, decoded)) return result def _select_mailbox(imap: imaplib.IMAP4_SSL, mailbox: str) -> bool: """ 尝试选中邮箱夹: 1. 直接 SELECT 配置的名字 / INBOX(读写、只读) 2. 尝试常见 UTF-7 编码收件箱(如 &XfJT0ZTx-) 3. 遍历 LIST,寻找带 \\Inbox 标记或名称包含 INBOX/收件箱 的文件夹,再 SELECT 实际名称 """ import re name = (mailbox or "INBOX").strip() or "INBOX" # 1) 优先尝试配置名和标准 INBOX primary_candidates = [] if name not in primary_candidates: primary_candidates.append(name) if "INBOX" not in primary_candidates: primary_candidates.append("INBOX") for candidate in primary_candidates: for readonly in (False, True): print(f" - 尝试 SELECT '{candidate}' (readonly={readonly}) ...") try: status, _ = imap.select(candidate, readonly=readonly) if status == "OK": print(" ✓ 直接 SELECT 成功") return True except Exception as e: print(f" ⚠ 直接 SELECT 失败: {e}") # 2) 尝试常见 UTF-7 编码收件箱 for candidate in ["&XfJT0ZTx-"]: for readonly in (False, True): print(f" - 尝试 UTF-7 收件箱 '{candidate}' (readonly={readonly}) ...") try: status, _ = imap.select(candidate, readonly=readonly) if status == "OK": print(" ✓ UTF-7 收件箱 SELECT 成功") return True except Exception as e: print(f" ⚠ SELECT '{candidate}' 失败: {e}") # 3) 通过 LIST 结果匹配带 \\Inbox 或名称包含 INBOX/收件箱 的文件夹 print(" - 尝试通过 LIST 匹配文件夹 ...") try: status, data = imap.list() if status != "OK" or not data: print(" ⚠ LIST 返回为空或非 OK") print(" ✗ 无法选择任何收件箱,请检查 mailbox 名称或在 UI 里用「加载文件夹」重选") return False except Exception as e: print(f" ⚠ LIST 失败: {e}") print(" ✗ 无法选择任何收件箱,请检查 mailbox 名称或在 UI 里用「加载文件夹」重选") return False for line in data: if isinstance(line, bytes): line_str = line.decode("utf-8", errors="ignore") else: line_str = line if "\\Inbox" not in line_str and all( kw not in line_str for kw in ['"INBOX"', '"Inbox"', '"收件箱"'] ): continue m = re.search(r'"([^"]+)"\s*$', line_str) if not m: continue actual_name = m.group(1) print(f" 尝试 SELECT 列表中的 '{actual_name}' ...") for readonly in (False, True): try: status2, _ = imap.select(actual_name, readonly=readonly) if status2 == "OK": print(" ✓ 通过 LIST 匹配成功") return True except Exception as e: print(f" ⚠ SELECT '{actual_name}' (readonly={readonly}) 失败: {e}") print(" ✗ 无法选择任何收件箱,请检查 mailbox 名称或在 UI 里用「加载文件夹」重选") return False @dataclass class SyncResult: name: str user: str ok: bool error: Optional[str] = None unread_count: int = 0 # ===== 3. 主逻辑 ===== def sync_account(conf: dict) -> SyncResult: name = conf.get("name") or conf.get("user") or "未命名账户" host = conf["host"] port = int(conf.get("port", 993)) user = conf["user"] password = conf["password"] mailbox = conf.get("mailbox", "INBOX") print(f"\n=== 开始同步账户:{name} ({user}) ===") try: with imaplib.IMAP4_SSL(host, port) as imap: print(f" - 连接 {host}:{port} ...") imap.login(user, password) print(" ✓ 登录成功") if not _select_mailbox(imap, mailbox): return SyncResult(name=name, user=user, ok=False, error=f"无法选择邮箱夹 {mailbox}") status, data = imap.search(None, "UNSEEN") if status != "OK": return SyncResult(name=name, user=user, ok=False, error="SEARCH UNSEEN 失败") ids = data[0].split() print(f" ✓ 未读邮件数量:{len(ids)}") for msg_id in ids[:10]: # 只看前 10 封,避免刷屏 status, msg_data = imap.fetch(msg_id, "(RFC822)") if status != "OK" or not msg_data: continue raw_email = msg_data[0][1] msg = email.message_from_bytes(raw_email) subject = _decode_header_value(msg.get("Subject")) print(f" - 未读主题:{subject!r}") return SyncResult(name=name, user=user, ok=True, unread_count=len(ids)) except Exception as e: return SyncResult(name=name, user=user, ok=False, error=str(e)) def main(): results: List[SyncResult] = [] for conf in ACCOUNTS: results.append(sync_account(conf)) print("\n=== 汇总 ===") for r in results: if r.ok: print(f"✓ {r.name} ({r.user}) 同步成功,未读 {r.unread_count} 封") else: print(f"✗ {r.name} ({r.user}) 同步失败:{r.error}") failed = [r for r in results if not r.ok] if failed: print("\n以下账户未同步成功,请根据错误信息调整配置或在系统 UI 里重新选择邮箱夹:") for r in failed: print(f" - {r.name} ({r.user}):{r.error}") else: print("\n所有账户均同步成功。") if __name__ == "__main__": main()