This commit is contained in:
丹尼尔
2026-03-11 13:59:22 +08:00
parent 6da73da8d7
commit 152877cef2
18 changed files with 2930 additions and 33 deletions

View File

@@ -3,6 +3,7 @@ import html
import logging
import os
from contextlib import asynccontextmanager
from logging.handlers import RotatingFileHandler
from datetime import datetime
from typing import Any, List, Optional
from urllib.parse import urlencode
@@ -39,10 +40,17 @@ IMAGE_MSG_TYPE = int(os.getenv("IMAGE_MSG_TYPE", "3"))
# 按 key 缓存取码结果与 Data62供后续步骤使用
qrcode_store: dict = {}
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s - %(message)s",
)
_LOG_FMT = "%(asctime)s [%(levelname)s] %(name)s - %(message)s"
logging.basicConfig(level=logging.INFO, format=_LOG_FMT)
# 日志落盘:写入 data/logs/app.log便于排查可按 LOG_DIR 覆盖目录)
_data_dir = os.getenv("DATA_DIR") or os.path.join(os.path.dirname(__file__), "data")
_log_dir = os.getenv("LOG_DIR") or os.path.join(_data_dir, "logs")
os.makedirs(_log_dir, exist_ok=True)
_app_log = os.path.join(_log_dir, "app.log")
_file_handler = RotatingFileHandler(_app_log, maxBytes=5 * 1024 * 1024, backupCount=5, encoding="utf-8")
_file_handler.setFormatter(logging.Formatter(_LOG_FMT))
logging.getLogger().addHandler(_file_handler)
logger = logging.getLogger("wechat-backend")
@@ -259,6 +267,15 @@ async def api_ws_status() -> dict:
return {"connected": is_ws_connected()}
def _proxy_from_env() -> str:
"""当登录页未填代理时,使用环境变量中的代理(服务器上设置 HTTP_PROXY/HTTPS_PROXY 后生效)。"""
return (
(os.environ.get("HTTPS_PROXY") or os.environ.get("https_proxy") or "").strip()
or (os.environ.get("HTTP_PROXY") or os.environ.get("http_proxy") or "").strip()
or ""
)
@app.post("/auth/qrcode")
async def get_login_qrcode(body: QrCodeRequest):
key = body.key
@@ -266,6 +283,11 @@ async def get_login_qrcode(body: QrCodeRequest):
raise HTTPException(status_code=400, detail="key is required")
payload = body.dict(exclude={"key"})
if not (payload.get("Proxy") or "").strip():
env_proxy = _proxy_from_env()
if env_proxy:
payload["Proxy"] = env_proxy
logger.info("GetLoginQrCodeNewDirect: using proxy from env (HTTP_PROXY/HTTPS_PROXY), len=%s", len(env_proxy))
url = f"{WECHAT_UPSTREAM_BASE_URL}/login/GetLoginQrCodeNewDirect"
logger.info("GetLoginQrCodeNewDirect: key=%s, payload=%s, url=%s", key, payload, url)
@@ -296,21 +318,30 @@ async def get_login_qrcode(body: QrCodeRequest):
)
logger.info(
"Upstream GetLoginQrCodeNewDirect success: status=%s, body=%s",
"Upstream GetLoginQrCodeNewDirect success: status=%s, body_len=%s",
resp.status_code,
body_text,
len(body_text),
)
data = resp.json()
# 保存 Data62(顶层 "Data62"),以 d000 标识移除尾部乱码
# 保存 Data62 完整原始数据,不清理不截断;仅做完整性校验供前端打印到操作日志
try:
data62 = (data.get("Data62") or "").strip()
if not data62 and isinstance(data.get("Data"), dict):
data62 = (data.get("Data").get("Data62") or data.get("Data").get("data62") or "").strip()
data62 = _clean_data62(data62)
qrcode_store[key] = {"data62": data62, "response": data}
data62_full = (data.get("Data62") or "").strip()
if not data62_full and isinstance(data.get("Data"), dict):
data62_full = (data.get("Data").get("Data62") or data.get("Data").get("data62") or "").strip()
qrcode_store[key] = {"data62": data62_full, "response": data}
data["_data62_stored"] = True
data["_data62_length"] = len(data62)
logger.info("Stored Data62 for key=%s (len=%s) from GetLoginQrCodeNewDirect top-level", key, len(data62))
data["_data62_length"] = len(data62_full)
check = _validate_data62(data62_full)
data["_data62_valid"] = check["valid"]
data["_data62_check"] = check["message"]
data["_data62_raw_length"] = check["raw_length"]
data["_data62_clean_length"] = len(data62_full)
data["_data62_preview"] = check["preview"]
logger.info(
"Stored Data62 (full) for key=%s (len=%s), valid=%s, check=%s",
key, len(data62_full), check["valid"], check["message"],
)
logger.info("Data62 full: %s", data62_full)
except Exception as e:
logger.warning("Store qrcode data for key=%s failed: %s", key, e)
return data
@@ -383,6 +414,32 @@ def _clean_data62(s: str) -> str:
return s
def _validate_data62(raw: str) -> dict:
"""检查 Data62 完整原始数据是否有效,不清理不截断,仅做格式与长度校验。"""
raw = (raw or "").strip()
raw_len = len(raw)
min_len = 32
hex_ok = bool(raw and all(c in "0123456789abcdefABCDEF" for c in raw))
length_ok = raw_len >= min_len
valid = bool(raw and hex_ok and length_ok)
if not raw:
msg = "无 Data62 或为空"
elif not hex_ok:
msg = "非十六进制格式"
elif not length_ok:
msg = f"长度不足({raw_len} < {min_len}"
else:
msg = "完整有效"
preview = (raw[:80] + "") if raw_len > 80 else (raw or "")
return {
"valid": valid,
"message": msg,
"raw_length": raw_len,
"clean_length": raw_len,
"preview": preview,
}
@app.get("/auth/scan-status")
async def check_scan_status(
key: str = Query(..., description="账号唯一标识"),
@@ -408,11 +465,11 @@ async def check_scan_status(
data = resp.json()
ticket = _extract_clean_ticket(data)
if ticket:
# data62 必须来自 GetLoginQrCodeNewDirect 返回的顶层 "Data62",不能使用 CheckLoginStatus 里data62(常为空);并去掉尾部乱码
# data62 使用完整原始数据,来自 GetLoginQrCodeNewDirect 的存储或本次响应Data62
stored = qrcode_store.get(key) or {}
data62 = _clean_data62(stored.get("data62") or "")
data62 = (stored.get("data62") or "").strip()
if not data62:
data62 = _clean_data62(data.get("Data62") or (data.get("Data") or {}).get("Data62") or (data.get("Data") or {}).get("data62") or "")
data62 = (data.get("Data62") or (data.get("Data") or {}).get("Data62") or (data.get("Data") or {}).get("data62") or "").strip()
params = {"key": SLIDER_VERIFY_KEY, "ticket": ticket}
if data62:
params["data62"] = data62
@@ -506,9 +563,8 @@ async def slider_form(
data62: str = Query("", description="Data62"),
ticket: str = Query(..., description="Original Ticket"),
):
"""本地滑块验证页:与 7765 同 DOM脚本经本机代理加载避免 CORS。"""
data62 = _clean_data62(data62)
return HTMLResponse(content=_slider_form_html(key, data62, ticket))
"""本地滑块验证页:与 7765 同 DOM脚本经本机代理加载避免 CORS。Data62 使用完整原始数据。"""
return HTMLResponse(content=_slider_form_html(key, data62.strip(), ticket))
# ---------- 滑块验证提交接口(代理 7765 ----------
@@ -525,7 +581,7 @@ async def api_slider_verify_get(
if not ticket_val:
raise HTTPException(status_code=400, detail="original_ticket or ticket required")
url = SLIDER_VERIFY_BASE_URL.rstrip("/") + "/"
params = {"key": key, "data62": _clean_data62(data62), "original_ticket": ticket_val}
params = {"key": key, "data62": (data62 or "").strip(), "original_ticket": ticket_val}
try:
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.get(url, params=params)
@@ -553,7 +609,7 @@ async def api_slider_verify_post(body: SliderVerifyBody):
if not ticket_val:
raise HTTPException(status_code=400, detail="original_ticket or ticket required")
url = SLIDER_VERIFY_BASE_URL.rstrip("/") + "/"
params = {"key": body.key, "data62": _clean_data62(body.data62 or ""), "original_ticket": ticket_val}
params = {"key": body.key, "data62": (body.data62 or "").strip(), "original_ticket": ticket_val}
try:
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.get(url, params=params)