fix:优化数据

This commit is contained in:
丹尼尔
2026-03-15 16:38:59 +08:00
parent a609f81a36
commit 3aa1a586e5
43 changed files with 14565 additions and 294 deletions

231
local.py Normal file
View File

@@ -0,0 +1,231 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
本地邮箱同步测试脚本(不依赖项目代码)
- 支持多个 IMAP 账户
- 每个账户单独测试登录、选择邮箱夹、列出未读邮件主题
- 结束时打印哪些账户成功、哪些失败(以及失败原因)
用来在对接「财务归档 → 同步」之前,本地先把邮箱配置调通。
"""
import imaplib
import email
from email.header import decode_header
from dataclasses import dataclass
from typing import List, Tuple, Optional
# ===== 1. 在这里配置你的邮箱账户 =====
# 163 示例IMAP 服务器 imap.163.com端口 993密码为「IMAP/SMTP 授权码」
ACCOUNTS = [
{
"name": "163 财务邮箱",
"host": "imap.163.com",
"port": 993,
"user": "danielghost@163.com",
"password": "TZjkMANWyYEDXui7",
"mailbox": "INBOX",
},
]
# ===== 2. 工具函数 =====
def _decode_header_value(value: Optional[str]) -> str:
if not value:
return ""
parts = decode_header(value)
decoded = ""
for text, enc in parts:
if isinstance(text, bytes):
decoded += text.decode(enc or "utf-8", errors="ignore")
else:
decoded += text
return decoded
def _list_mailboxes(imap: imaplib.IMAP4_SSL) -> List[Tuple[str, str]]:
"""列出所有邮箱夹,返回 [(raw_name, decoded_name)]"""
status, data = imap.list()
if status != "OK" or not data:
return []
result: List[Tuple[str, str]] = []
for line in data:
if not isinstance(line, bytes):
continue
try:
line_str = line.decode("ascii", errors="replace")
except Exception:
continue
# 典型格式b'(\\HasNoChildren) "/" "&UXZO1mWHTvZZOQ-"'
parts = line_str.split(" ")
if not parts:
continue
raw = parts[-1].strip('"')
# 简单处理 UTF-7足够看中文「收件箱」
try:
decoded = raw.encode("latin1").decode("utf-7")
except Exception:
decoded = raw
result.append((raw, decoded))
return result
def _select_mailbox(imap: imaplib.IMAP4_SSL, mailbox: str) -> bool:
"""
尝试选中邮箱夹:
1. 直接 SELECT 配置的名字 / INBOX读写、只读
2. 尝试常见 UTF-7 编码收件箱(如 &XfJT0ZTx-
3. 遍历 LIST寻找带 \\Inbox 标记或名称包含 INBOX/收件箱 的文件夹,再 SELECT 实际名称
"""
import re
name = (mailbox or "INBOX").strip() or "INBOX"
# 1) 优先尝试配置名和标准 INBOX
primary_candidates = []
if name not in primary_candidates:
primary_candidates.append(name)
if "INBOX" not in primary_candidates:
primary_candidates.append("INBOX")
for candidate in primary_candidates:
for readonly in (False, True):
print(f" - 尝试 SELECT '{candidate}' (readonly={readonly}) ...")
try:
status, _ = imap.select(candidate, readonly=readonly)
if status == "OK":
print(" ✓ 直接 SELECT 成功")
return True
except Exception as e:
print(f" ⚠ 直接 SELECT 失败: {e}")
# 2) 尝试常见 UTF-7 编码收件箱
for candidate in ["&XfJT0ZTx-"]:
for readonly in (False, True):
print(f" - 尝试 UTF-7 收件箱 '{candidate}' (readonly={readonly}) ...")
try:
status, _ = imap.select(candidate, readonly=readonly)
if status == "OK":
print(" ✓ UTF-7 收件箱 SELECT 成功")
return True
except Exception as e:
print(f" ⚠ SELECT '{candidate}' 失败: {e}")
# 3) 通过 LIST 结果匹配带 \\Inbox 或名称包含 INBOX/收件箱 的文件夹
print(" - 尝试通过 LIST 匹配文件夹 ...")
try:
status, data = imap.list()
if status != "OK" or not data:
print(" ⚠ LIST 返回为空或非 OK")
print(" ✗ 无法选择任何收件箱,请检查 mailbox 名称或在 UI 里用「加载文件夹」重选")
return False
except Exception as e:
print(f" ⚠ LIST 失败: {e}")
print(" ✗ 无法选择任何收件箱,请检查 mailbox 名称或在 UI 里用「加载文件夹」重选")
return False
for line in data:
if isinstance(line, bytes):
line_str = line.decode("utf-8", errors="ignore")
else:
line_str = line
if "\\Inbox" not in line_str and all(
kw not in line_str for kw in ['"INBOX"', '"Inbox"', '"收件箱"']
):
continue
m = re.search(r'"([^"]+)"\s*$', line_str)
if not m:
continue
actual_name = m.group(1)
print(f" 尝试 SELECT 列表中的 '{actual_name}' ...")
for readonly in (False, True):
try:
status2, _ = imap.select(actual_name, readonly=readonly)
if status2 == "OK":
print(" ✓ 通过 LIST 匹配成功")
return True
except Exception as e:
print(f" ⚠ SELECT '{actual_name}' (readonly={readonly}) 失败: {e}")
print(" ✗ 无法选择任何收件箱,请检查 mailbox 名称或在 UI 里用「加载文件夹」重选")
return False
@dataclass
class SyncResult:
name: str
user: str
ok: bool
error: Optional[str] = None
unread_count: int = 0
# ===== 3. 主逻辑 =====
def sync_account(conf: dict) -> SyncResult:
name = conf.get("name") or conf.get("user") or "未命名账户"
host = conf["host"]
port = int(conf.get("port", 993))
user = conf["user"]
password = conf["password"]
mailbox = conf.get("mailbox", "INBOX")
print(f"\n=== 开始同步账户:{name} ({user}) ===")
try:
with imaplib.IMAP4_SSL(host, port) as imap:
print(f" - 连接 {host}:{port} ...")
imap.login(user, password)
print(" ✓ 登录成功")
if not _select_mailbox(imap, mailbox):
return SyncResult(name=name, user=user, ok=False, error=f"无法选择邮箱夹 {mailbox}")
status, data = imap.search(None, "UNSEEN")
if status != "OK":
return SyncResult(name=name, user=user, ok=False, error="SEARCH UNSEEN 失败")
ids = data[0].split()
print(f" ✓ 未读邮件数量:{len(ids)}")
for msg_id in ids[:10]: # 只看前 10 封,避免刷屏
status, msg_data = imap.fetch(msg_id, "(RFC822)")
if status != "OK" or not msg_data:
continue
raw_email = msg_data[0][1]
msg = email.message_from_bytes(raw_email)
subject = _decode_header_value(msg.get("Subject"))
print(f" - 未读主题:{subject!r}")
return SyncResult(name=name, user=user, ok=True, unread_count=len(ids))
except Exception as e:
return SyncResult(name=name, user=user, ok=False, error=str(e))
def main():
results: List[SyncResult] = []
for conf in ACCOUNTS:
results.append(sync_account(conf))
print("\n=== 汇总 ===")
for r in results:
if r.ok:
print(f"{r.name} ({r.user}) 同步成功,未读 {r.unread_count}")
else:
print(f"{r.name} ({r.user}) 同步失败:{r.error}")
failed = [r for r in results if not r.ok]
if failed:
print("\n以下账户未同步成功,请根据错误信息调整配置或在系统 UI 里重新选择邮箱夹:")
for r in failed:
print(f" - {r.name} ({r.user}){r.error}")
else:
print("\n所有账户均同步成功。")
if __name__ == "__main__":
main()