231 lines
7.8 KiB
Python
231 lines
7.8 KiB
Python
|
||
|
||
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
|
||
"""
|
||
本地邮箱同步测试脚本(不依赖项目代码)
|
||
|
||
- 支持多个 IMAP 账户
|
||
- 每个账户单独测试登录、选择邮箱夹、列出未读邮件主题
|
||
- 结束时打印哪些账户成功、哪些失败(以及失败原因)
|
||
|
||
用来在对接「财务归档 → 同步」之前,本地先把邮箱配置调通。
|
||
"""
|
||
|
||
import imaplib
|
||
import email
|
||
from email.header import decode_header
|
||
from dataclasses import dataclass
|
||
from typing import List, Tuple, Optional
|
||
|
||
|
||
# ===== 1. 在这里配置你的邮箱账户 =====
|
||
# 163 示例:IMAP 服务器 imap.163.com,端口 993,密码为「IMAP/SMTP 授权码」
|
||
ACCOUNTS = [
|
||
{
|
||
"name": "163 财务邮箱",
|
||
"host": "imap.163.com",
|
||
"port": 993,
|
||
"user": "danielghost@163.com",
|
||
"password": "TZjkMANWyYEDXui7",
|
||
"mailbox": "INBOX",
|
||
},
|
||
]
|
||
# ===== 2. 工具函数 =====
|
||
|
||
def _decode_header_value(value: Optional[str]) -> str:
|
||
if not value:
|
||
return ""
|
||
parts = decode_header(value)
|
||
decoded = ""
|
||
for text, enc in parts:
|
||
if isinstance(text, bytes):
|
||
decoded += text.decode(enc or "utf-8", errors="ignore")
|
||
else:
|
||
decoded += text
|
||
return decoded
|
||
|
||
|
||
def _list_mailboxes(imap: imaplib.IMAP4_SSL) -> List[Tuple[str, str]]:
|
||
"""列出所有邮箱夹,返回 [(raw_name, decoded_name)]"""
|
||
status, data = imap.list()
|
||
if status != "OK" or not data:
|
||
return []
|
||
|
||
result: List[Tuple[str, str]] = []
|
||
for line in data:
|
||
if not isinstance(line, bytes):
|
||
continue
|
||
try:
|
||
line_str = line.decode("ascii", errors="replace")
|
||
except Exception:
|
||
continue
|
||
# 典型格式:b'(\\HasNoChildren) "/" "&UXZO1mWHTvZZOQ-"'
|
||
parts = line_str.split(" ")
|
||
if not parts:
|
||
continue
|
||
raw = parts[-1].strip('"')
|
||
# 简单处理 UTF-7,足够看中文「收件箱」
|
||
try:
|
||
decoded = raw.encode("latin1").decode("utf-7")
|
||
except Exception:
|
||
decoded = raw
|
||
result.append((raw, decoded))
|
||
return result
|
||
|
||
|
||
def _select_mailbox(imap: imaplib.IMAP4_SSL, mailbox: str) -> bool:
|
||
"""
|
||
尝试选中邮箱夹:
|
||
1. 直接 SELECT 配置的名字 / INBOX(读写、只读)
|
||
2. 尝试常见 UTF-7 编码收件箱(如 &XfJT0ZTx-)
|
||
3. 遍历 LIST,寻找带 \\Inbox 标记或名称包含 INBOX/收件箱 的文件夹,再 SELECT 实际名称
|
||
"""
|
||
import re
|
||
|
||
name = (mailbox or "INBOX").strip() or "INBOX"
|
||
|
||
# 1) 优先尝试配置名和标准 INBOX
|
||
primary_candidates = []
|
||
if name not in primary_candidates:
|
||
primary_candidates.append(name)
|
||
if "INBOX" not in primary_candidates:
|
||
primary_candidates.append("INBOX")
|
||
|
||
for candidate in primary_candidates:
|
||
for readonly in (False, True):
|
||
print(f" - 尝试 SELECT '{candidate}' (readonly={readonly}) ...")
|
||
try:
|
||
status, _ = imap.select(candidate, readonly=readonly)
|
||
if status == "OK":
|
||
print(" ✓ 直接 SELECT 成功")
|
||
return True
|
||
except Exception as e:
|
||
print(f" ⚠ 直接 SELECT 失败: {e}")
|
||
|
||
# 2) 尝试常见 UTF-7 编码收件箱
|
||
for candidate in ["&XfJT0ZTx-"]:
|
||
for readonly in (False, True):
|
||
print(f" - 尝试 UTF-7 收件箱 '{candidate}' (readonly={readonly}) ...")
|
||
try:
|
||
status, _ = imap.select(candidate, readonly=readonly)
|
||
if status == "OK":
|
||
print(" ✓ UTF-7 收件箱 SELECT 成功")
|
||
return True
|
||
except Exception as e:
|
||
print(f" ⚠ SELECT '{candidate}' 失败: {e}")
|
||
|
||
# 3) 通过 LIST 结果匹配带 \\Inbox 或名称包含 INBOX/收件箱 的文件夹
|
||
print(" - 尝试通过 LIST 匹配文件夹 ...")
|
||
try:
|
||
status, data = imap.list()
|
||
if status != "OK" or not data:
|
||
print(" ⚠ LIST 返回为空或非 OK")
|
||
print(" ✗ 无法选择任何收件箱,请检查 mailbox 名称或在 UI 里用「加载文件夹」重选")
|
||
return False
|
||
except Exception as e:
|
||
print(f" ⚠ LIST 失败: {e}")
|
||
print(" ✗ 无法选择任何收件箱,请检查 mailbox 名称或在 UI 里用「加载文件夹」重选")
|
||
return False
|
||
|
||
for line in data:
|
||
if isinstance(line, bytes):
|
||
line_str = line.decode("utf-8", errors="ignore")
|
||
else:
|
||
line_str = line
|
||
|
||
if "\\Inbox" not in line_str and all(
|
||
kw not in line_str for kw in ['"INBOX"', '"Inbox"', '"收件箱"']
|
||
):
|
||
continue
|
||
|
||
m = re.search(r'"([^"]+)"\s*$', line_str)
|
||
if not m:
|
||
continue
|
||
actual_name = m.group(1)
|
||
print(f" 尝试 SELECT 列表中的 '{actual_name}' ...")
|
||
for readonly in (False, True):
|
||
try:
|
||
status2, _ = imap.select(actual_name, readonly=readonly)
|
||
if status2 == "OK":
|
||
print(" ✓ 通过 LIST 匹配成功")
|
||
return True
|
||
except Exception as e:
|
||
print(f" ⚠ SELECT '{actual_name}' (readonly={readonly}) 失败: {e}")
|
||
|
||
print(" ✗ 无法选择任何收件箱,请检查 mailbox 名称或在 UI 里用「加载文件夹」重选")
|
||
return False
|
||
|
||
|
||
@dataclass
|
||
class SyncResult:
|
||
name: str
|
||
user: str
|
||
ok: bool
|
||
error: Optional[str] = None
|
||
unread_count: int = 0
|
||
|
||
|
||
# ===== 3. 主逻辑 =====
|
||
|
||
def sync_account(conf: dict) -> SyncResult:
|
||
name = conf.get("name") or conf.get("user") or "未命名账户"
|
||
host = conf["host"]
|
||
port = int(conf.get("port", 993))
|
||
user = conf["user"]
|
||
password = conf["password"]
|
||
mailbox = conf.get("mailbox", "INBOX")
|
||
|
||
print(f"\n=== 开始同步账户:{name} ({user}) ===")
|
||
try:
|
||
with imaplib.IMAP4_SSL(host, port) as imap:
|
||
print(f" - 连接 {host}:{port} ...")
|
||
imap.login(user, password)
|
||
print(" ✓ 登录成功")
|
||
|
||
if not _select_mailbox(imap, mailbox):
|
||
return SyncResult(name=name, user=user, ok=False, error=f"无法选择邮箱夹 {mailbox}")
|
||
|
||
status, data = imap.search(None, "UNSEEN")
|
||
if status != "OK":
|
||
return SyncResult(name=name, user=user, ok=False, error="SEARCH UNSEEN 失败")
|
||
|
||
ids = data[0].split()
|
||
print(f" ✓ 未读邮件数量:{len(ids)}")
|
||
for msg_id in ids[:10]: # 只看前 10 封,避免刷屏
|
||
status, msg_data = imap.fetch(msg_id, "(RFC822)")
|
||
if status != "OK" or not msg_data:
|
||
continue
|
||
raw_email = msg_data[0][1]
|
||
msg = email.message_from_bytes(raw_email)
|
||
subject = _decode_header_value(msg.get("Subject"))
|
||
print(f" - 未读主题:{subject!r}")
|
||
return SyncResult(name=name, user=user, ok=True, unread_count=len(ids))
|
||
except Exception as e:
|
||
return SyncResult(name=name, user=user, ok=False, error=str(e))
|
||
|
||
|
||
def main():
|
||
results: List[SyncResult] = []
|
||
for conf in ACCOUNTS:
|
||
results.append(sync_account(conf))
|
||
|
||
print("\n=== 汇总 ===")
|
||
for r in results:
|
||
if r.ok:
|
||
print(f"✓ {r.name} ({r.user}) 同步成功,未读 {r.unread_count} 封")
|
||
else:
|
||
print(f"✗ {r.name} ({r.user}) 同步失败:{r.error}")
|
||
|
||
failed = [r for r in results if not r.ok]
|
||
if failed:
|
||
print("\n以下账户未同步成功,请根据错误信息调整配置或在系统 UI 里重新选择邮箱夹:")
|
||
for r in failed:
|
||
print(f" - {r.name} ({r.user}):{r.error}")
|
||
else:
|
||
print("\n所有账户均同步成功。")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main() |