Files
AiTool/local.py
2026-03-15 16:38:59 +08:00

231 lines
7.8 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
本地邮箱同步测试脚本(不依赖项目代码)
- 支持多个 IMAP 账户
- 每个账户单独测试登录、选择邮箱夹、列出未读邮件主题
- 结束时打印哪些账户成功、哪些失败(以及失败原因)
用来在对接「财务归档 → 同步」之前,本地先把邮箱配置调通。
"""
import imaplib
import email
from email.header import decode_header
from dataclasses import dataclass
from typing import List, Tuple, Optional
# ===== 1. 在这里配置你的邮箱账户 =====
# 163 示例IMAP 服务器 imap.163.com端口 993密码为「IMAP/SMTP 授权码」
ACCOUNTS = [
{
"name": "163 财务邮箱",
"host": "imap.163.com",
"port": 993,
"user": "danielghost@163.com",
"password": "TZjkMANWyYEDXui7",
"mailbox": "INBOX",
},
]
# ===== 2. 工具函数 =====
def _decode_header_value(value: Optional[str]) -> str:
if not value:
return ""
parts = decode_header(value)
decoded = ""
for text, enc in parts:
if isinstance(text, bytes):
decoded += text.decode(enc or "utf-8", errors="ignore")
else:
decoded += text
return decoded
def _list_mailboxes(imap: imaplib.IMAP4_SSL) -> List[Tuple[str, str]]:
"""列出所有邮箱夹,返回 [(raw_name, decoded_name)]"""
status, data = imap.list()
if status != "OK" or not data:
return []
result: List[Tuple[str, str]] = []
for line in data:
if not isinstance(line, bytes):
continue
try:
line_str = line.decode("ascii", errors="replace")
except Exception:
continue
# 典型格式b'(\\HasNoChildren) "/" "&UXZO1mWHTvZZOQ-"'
parts = line_str.split(" ")
if not parts:
continue
raw = parts[-1].strip('"')
# 简单处理 UTF-7足够看中文「收件箱」
try:
decoded = raw.encode("latin1").decode("utf-7")
except Exception:
decoded = raw
result.append((raw, decoded))
return result
def _select_mailbox(imap: imaplib.IMAP4_SSL, mailbox: str) -> bool:
"""
尝试选中邮箱夹:
1. 直接 SELECT 配置的名字 / INBOX读写、只读
2. 尝试常见 UTF-7 编码收件箱(如 &XfJT0ZTx-
3. 遍历 LIST寻找带 \\Inbox 标记或名称包含 INBOX/收件箱 的文件夹,再 SELECT 实际名称
"""
import re
name = (mailbox or "INBOX").strip() or "INBOX"
# 1) 优先尝试配置名和标准 INBOX
primary_candidates = []
if name not in primary_candidates:
primary_candidates.append(name)
if "INBOX" not in primary_candidates:
primary_candidates.append("INBOX")
for candidate in primary_candidates:
for readonly in (False, True):
print(f" - 尝试 SELECT '{candidate}' (readonly={readonly}) ...")
try:
status, _ = imap.select(candidate, readonly=readonly)
if status == "OK":
print(" ✓ 直接 SELECT 成功")
return True
except Exception as e:
print(f" ⚠ 直接 SELECT 失败: {e}")
# 2) 尝试常见 UTF-7 编码收件箱
for candidate in ["&XfJT0ZTx-"]:
for readonly in (False, True):
print(f" - 尝试 UTF-7 收件箱 '{candidate}' (readonly={readonly}) ...")
try:
status, _ = imap.select(candidate, readonly=readonly)
if status == "OK":
print(" ✓ UTF-7 收件箱 SELECT 成功")
return True
except Exception as e:
print(f" ⚠ SELECT '{candidate}' 失败: {e}")
# 3) 通过 LIST 结果匹配带 \\Inbox 或名称包含 INBOX/收件箱 的文件夹
print(" - 尝试通过 LIST 匹配文件夹 ...")
try:
status, data = imap.list()
if status != "OK" or not data:
print(" ⚠ LIST 返回为空或非 OK")
print(" ✗ 无法选择任何收件箱,请检查 mailbox 名称或在 UI 里用「加载文件夹」重选")
return False
except Exception as e:
print(f" ⚠ LIST 失败: {e}")
print(" ✗ 无法选择任何收件箱,请检查 mailbox 名称或在 UI 里用「加载文件夹」重选")
return False
for line in data:
if isinstance(line, bytes):
line_str = line.decode("utf-8", errors="ignore")
else:
line_str = line
if "\\Inbox" not in line_str and all(
kw not in line_str for kw in ['"INBOX"', '"Inbox"', '"收件箱"']
):
continue
m = re.search(r'"([^"]+)"\s*$', line_str)
if not m:
continue
actual_name = m.group(1)
print(f" 尝试 SELECT 列表中的 '{actual_name}' ...")
for readonly in (False, True):
try:
status2, _ = imap.select(actual_name, readonly=readonly)
if status2 == "OK":
print(" ✓ 通过 LIST 匹配成功")
return True
except Exception as e:
print(f" ⚠ SELECT '{actual_name}' (readonly={readonly}) 失败: {e}")
print(" ✗ 无法选择任何收件箱,请检查 mailbox 名称或在 UI 里用「加载文件夹」重选")
return False
@dataclass
class SyncResult:
name: str
user: str
ok: bool
error: Optional[str] = None
unread_count: int = 0
# ===== 3. 主逻辑 =====
def sync_account(conf: dict) -> SyncResult:
name = conf.get("name") or conf.get("user") or "未命名账户"
host = conf["host"]
port = int(conf.get("port", 993))
user = conf["user"]
password = conf["password"]
mailbox = conf.get("mailbox", "INBOX")
print(f"\n=== 开始同步账户:{name} ({user}) ===")
try:
with imaplib.IMAP4_SSL(host, port) as imap:
print(f" - 连接 {host}:{port} ...")
imap.login(user, password)
print(" ✓ 登录成功")
if not _select_mailbox(imap, mailbox):
return SyncResult(name=name, user=user, ok=False, error=f"无法选择邮箱夹 {mailbox}")
status, data = imap.search(None, "UNSEEN")
if status != "OK":
return SyncResult(name=name, user=user, ok=False, error="SEARCH UNSEEN 失败")
ids = data[0].split()
print(f" ✓ 未读邮件数量:{len(ids)}")
for msg_id in ids[:10]: # 只看前 10 封,避免刷屏
status, msg_data = imap.fetch(msg_id, "(RFC822)")
if status != "OK" or not msg_data:
continue
raw_email = msg_data[0][1]
msg = email.message_from_bytes(raw_email)
subject = _decode_header_value(msg.get("Subject"))
print(f" - 未读主题:{subject!r}")
return SyncResult(name=name, user=user, ok=True, unread_count=len(ids))
except Exception as e:
return SyncResult(name=name, user=user, ok=False, error=str(e))
def main():
results: List[SyncResult] = []
for conf in ACCOUNTS:
results.append(sync_account(conf))
print("\n=== 汇总 ===")
for r in results:
if r.ok:
print(f"{r.name} ({r.user}) 同步成功,未读 {r.unread_count}")
else:
print(f"{r.name} ({r.user}) 同步失败:{r.error}")
failed = [r for r in results if not r.ok]
if failed:
print("\n以下账户未同步成功,请根据错误信息调整配置或在系统 UI 里重新选择邮箱夹:")
for r in failed:
print(f" - {r.name} ({r.user}){r.error}")
else:
print("\n所有账户均同步成功。")
if __name__ == "__main__":
main()