2249 lines
94 KiB
Python
2249 lines
94 KiB
Python
import asyncio
|
||
import html
|
||
import logging
|
||
import os
|
||
from contextlib import asynccontextmanager
|
||
|
||
# 优先加载项目根目录的 .env(不依赖当前工作目录),使 HTTP_PROXY/HTTPS_PROXY 等生效
|
||
try:
|
||
from dotenv import load_dotenv
|
||
_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
|
||
load_dotenv(os.path.join(_root, ".env"))
|
||
except ImportError:
|
||
pass
|
||
from logging.handlers import RotatingFileHandler
|
||
from datetime import datetime
|
||
from typing import Any, Dict, List, Optional
|
||
from urllib.parse import urlencode
|
||
|
||
import httpx
|
||
from fastapi import FastAPI, HTTPException, Query, Request
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
from fastapi.responses import HTMLResponse, JSONResponse, Response
|
||
from fastapi.staticfiles import StaticFiles
|
||
from pydantic import BaseModel
|
||
|
||
try:
|
||
from backend import store
|
||
from backend.llm_client import chat as llm_chat
|
||
from backend.ws_sync import is_ws_connected, set_message_callback, start_ws_sync
|
||
except ImportError:
|
||
import store
|
||
from llm_client import chat as llm_chat
|
||
from ws_sync import is_ws_connected, set_message_callback, start_ws_sync
|
||
|
||
WECHAT_UPSTREAM_BASE_URL = os.getenv("WECHAT_UPSTREAM_BASE_URL", "http://localhost:8080").rstrip("/")
|
||
CHECK_STATUS_BASE_URL = os.getenv("CHECK_STATUS_BASE_URL", "http://113.44.162.180:7006").rstrip("/")
|
||
# 消息实时回调:设置后 7006 将新消息 POST 到该地址,作为主接收入口(与 SetCallback 一致)
|
||
CALLBACK_BASE_URL = (os.getenv("CALLBACK_BASE_URL") or "").strip().rstrip("/")
|
||
SLIDER_VERIFY_BASE_URL = os.getenv("SLIDER_VERIFY_BASE_URL", "http://113.44.162.180:7765").rstrip("/")
|
||
# 滑块服务 7765 的 key,与账号 key 无关,默认使用提供方 QQ(使用其公共服务时必填)
|
||
SLIDER_VERIFY_KEY = os.getenv("SLIDER_VERIFY_KEY", "408449830")
|
||
# 发送文本消息:swagger 中为 POST /message/SendTextMessage,body 为 SendMessageModel(MsgItem 数组)
|
||
SEND_MSG_PATH = (os.getenv("SEND_MSG_PATH") or "/message/SendTextMessage").strip()
|
||
# 发送图片消息:7006 为 /message/SendImageMessage,body MsgItem 含 ImageContent、MsgType=0
|
||
SEND_IMAGE_UPSTREAM_BASE_URL = (os.getenv("SEND_IMAGE_UPSTREAM_BASE_URL") or "").strip() or CHECK_STATUS_BASE_URL
|
||
SEND_IMAGE_PATH = (os.getenv("SEND_IMAGE_PATH") or "").strip() or "/message/SendImageMessage"
|
||
# 图片消息 MsgType:7006 SendImageMessage 为 0
|
||
IMAGE_MSG_TYPE = int(os.getenv("IMAGE_MSG_TYPE", "0"))
|
||
|
||
# 按 key 缓存取码结果与 Data62,供后续步骤使用
|
||
qrcode_store: dict = {}
|
||
|
||
# 按 key 缓存联系人索引:name(微信号/昵称/备注) -> 联系人详情
|
||
_contact_index: Dict[str, Dict[str, dict]] = {}
|
||
|
||
_LOG_FMT = "%(asctime)s [%(levelname)s] %(name)s - %(message)s"
|
||
logging.basicConfig(level=logging.INFO, format=_LOG_FMT)
|
||
# 日志落盘:写入 data/logs/app.log,便于排查(可按 LOG_DIR 覆盖目录)
|
||
_data_dir = os.getenv("DATA_DIR") or os.path.join(os.path.dirname(__file__), "data")
|
||
_log_dir = os.getenv("LOG_DIR") or os.path.join(_data_dir, "logs")
|
||
os.makedirs(_log_dir, exist_ok=True)
|
||
_app_log = os.path.join(_log_dir, "app.log")
|
||
_file_handler = RotatingFileHandler(_app_log, maxBytes=5 * 1024 * 1024, backupCount=5, encoding="utf-8")
|
||
_file_handler.setFormatter(logging.Formatter(_LOG_FMT))
|
||
logging.getLogger().addHandler(_file_handler)
|
||
|
||
logger = logging.getLogger("wechat-backend")
|
||
|
||
|
||
def _is_self_sent(msg: dict) -> bool:
|
||
"""判断是否为当前账号自己发出的消息(则不由 AI 回复)。"""
|
||
if msg.get("direction") == "out":
|
||
return True
|
||
if msg.get("IsSelf") in (1, True, "1"):
|
||
return True
|
||
return False
|
||
|
||
|
||
def _allowed_ai_reply(key: str, from_user: str) -> bool:
|
||
"""分级处理:仅超级管理员或白名单内的联系人可获得 AI 回复,其他一律不回复。"""
|
||
if not from_user or not from_user.strip():
|
||
return False
|
||
cfg = store.get_ai_reply_config(key)
|
||
if not cfg:
|
||
logger.debug("AI reply skipped: no config for key=%s (请在管理页「AI 回复设置」保存超级管理员/白名单)", key[:8])
|
||
return False
|
||
super_admins = set(cfg.get("super_admin_wxids") or [])
|
||
whitelist = set(cfg.get("whitelist_wxids") or [])
|
||
allowed = from_user.strip() in super_admins or from_user.strip() in whitelist
|
||
if not allowed:
|
||
logger.debug("AI reply skipped: from_user=%s not in whitelist/super_admin for key=%s", from_user[:20], key[:8])
|
||
return allowed
|
||
|
||
|
||
def _is_super_admin(key: str, from_user: str) -> bool:
|
||
"""仅超级管理员可调用代发消息等 function call;白名单仅可得到普通回复。"""
|
||
if not from_user or not from_user.strip():
|
||
return False
|
||
cfg = store.get_ai_reply_config(key)
|
||
if not cfg:
|
||
return False
|
||
super_admins = set(cfg.get("super_admin_wxids") or [])
|
||
return from_user.strip() in super_admins
|
||
|
||
|
||
async def _ai_takeover_reply(key: str, from_user: str, content: str) -> None:
|
||
"""收到他人消息时由 AI 接管:根据指令生成回复或调用内置动作(如代发消息)。"""
|
||
if not from_user or not content or not content.strip():
|
||
return
|
||
try:
|
||
recent = store.list_sync_messages(key, limit=10)
|
||
# 仅取与该用户的最近几条作为上下文(简化:只取最后几条)
|
||
context = []
|
||
for m in reversed(recent):
|
||
c = (m.get("Content") or m.get("content") or "").strip()
|
||
if not c:
|
||
continue
|
||
if m.get("direction") == "out" and (m.get("ToUserName") or "").strip() == from_user:
|
||
context.append({"role": "assistant", "content": c})
|
||
elif (m.get("FromUserName") or m.get("from") or "").strip() == from_user and not _is_self_sent(m):
|
||
context.append({"role": "user", "content": c})
|
||
if len(context) >= 6:
|
||
break
|
||
if not context or context[-1].get("role") != "user":
|
||
context.append({"role": "user", "content": content})
|
||
|
||
# Function Call 风格:让模型选择是直接回复,还是发起“代发消息”等动作
|
||
system_prompt = (
|
||
"你是微信客服助手,负责根据用户消息决定是直接回复,还是调用内置动作。"
|
||
"只用 JSON 回复,格式如下(不要多余文本):\n"
|
||
"{\n"
|
||
' \"type\": \"reply\" | \"send_message\",\n'
|
||
' \"reply\"?: \"直接回复给当前联系人的内容(当 type=reply 时必填)\",\n'
|
||
' \"target_wxid\"?: \"要代发消息的对象 wxid(当 type=send_message 时必填,可以是当前联系人或其他 wxid)\",\n'
|
||
' \"content\"?: \"要代发的消息内容(当 type=send_message 时必填)\"\n'
|
||
"}\n"
|
||
"注意:\n"
|
||
"1)如果用户只是正常聊天,优先使用 type=reply;\n"
|
||
"2)只有当用户明确指令你“帮我给某人发消息/转告/通知 xxx”时,才使用 type=send_message;\n"
|
||
"3)严禁凭空编造 target_wxid,若无法确定对方 wxid,请继续使用 type=reply 询问用户;\n"
|
||
"4)不要用自然语言解释 JSON,只输出 JSON 本身。"
|
||
)
|
||
messages = [{"role": "system", "content": system_prompt}, *context]
|
||
|
||
raw = await llm_chat(messages)
|
||
if not raw or not raw.strip():
|
||
return
|
||
|
||
action = None
|
||
try:
|
||
import json as _json
|
||
|
||
action = _json.loads(raw)
|
||
except Exception:
|
||
# 回退为普通文本回复
|
||
reply_text = raw.strip()
|
||
await _send_message_upstream(key, from_user, reply_text)
|
||
logger.info("AI takeover replied (fallback) to %s: %s", from_user[:20], reply_text[:50])
|
||
return
|
||
|
||
if not isinstance(action, dict) or "type" not in action:
|
||
return
|
||
|
||
action_type = str(action.get("type") or "").strip()
|
||
if action_type == "send_message":
|
||
if not _is_super_admin(key, from_user):
|
||
await _send_message_upstream(
|
||
key, from_user,
|
||
"代发消息等操作仅限超级管理员使用,您可正常聊天获得回复。"
|
||
)
|
||
logger.info("AI send_message rejected: from_user not super_admin (key=***%s)", key[-4:] if len(key) >= 4 else "****")
|
||
return
|
||
target = str(action.get("target_wxid") or "").strip()
|
||
msg = str(action.get("content") or "").strip()
|
||
if target and msg:
|
||
# 先按“昵称/备注/微信号”解析成真正的 wxid
|
||
real_wxid = await _resolve_contact_username(key, target)
|
||
if not real_wxid:
|
||
# 回一条提示给当前联系人,说明未找到该联系人
|
||
warn = f"未找到名为「{target}」的联系人,请确认称呼或直接提供对方微信号。"
|
||
await _send_message_upstream(key, from_user, warn)
|
||
logger.info("AI send_message resolve failed for %s (key=***%s)", target, key[-4:] if len(key) >= 4 else "****")
|
||
return
|
||
await _send_message_upstream(key, real_wxid, msg)
|
||
logger.info("AI function-call send_message to %s (raw=%s): %s", real_wxid[:20], target[:20], msg[:50])
|
||
elif action_type == "reply":
|
||
reply_text = str(action.get("reply") or "").strip()
|
||
if reply_text:
|
||
await _send_message_upstream(key, from_user, reply_text)
|
||
logger.info("AI takeover replied to %s: %s", from_user[:20], reply_text[:50])
|
||
except Exception as e:
|
||
logger.exception("AI takeover reply error (from=%s): %s", from_user, e)
|
||
|
||
|
||
def _on_ws_message(key: str, data: Any) -> None:
|
||
"""GetSyncMsg / 回调 收到数据时:写入 store;若为他人消息则 AI 接管对话。"""
|
||
# 1)上游典型结构:{"MsgList": [...]} / {"List": [...]} / {"msgList": [...]}
|
||
if isinstance(data, dict):
|
||
msg_list = data.get("MsgList") or data.get("List") or data.get("msgList")
|
||
if isinstance(msg_list, list) and msg_list:
|
||
store.append_sync_messages(key, msg_list)
|
||
for m in msg_list:
|
||
if _is_self_sent(m):
|
||
continue
|
||
from_user = (m.get("FromUserName") or m.get("from") or _unwrap_wechat_field(m.get("from_user_name")) or "").strip()
|
||
content = (m.get("Content") or m.get("content") or _unwrap_wechat_field(m.get("content")) or "").strip()
|
||
msg_type = m.get("MsgType") or m.get("msgType")
|
||
if from_user and content and (msg_type in (1, None) or str(msg_type) == "1"): # 仅文本触发 AI
|
||
if not _allowed_ai_reply(key, from_user):
|
||
continue
|
||
try:
|
||
asyncio.get_running_loop().create_task(_ai_takeover_reply(key, from_user, content))
|
||
except RuntimeError:
|
||
pass
|
||
return
|
||
|
||
# 2)如果 data 本身就是列表(例如回调已归一化为 [normalized_msg])
|
||
if isinstance(data, list):
|
||
store.append_sync_messages(key, data)
|
||
for m in data:
|
||
if not isinstance(m, dict) or _is_self_sent(m):
|
||
continue
|
||
from_user = (m.get("FromUserName") or m.get("from") or _unwrap_wechat_field(m.get("from_user_name")) or "").strip()
|
||
content = (m.get("Content") or m.get("content") or _unwrap_wechat_field(m.get("content")) or "").strip()
|
||
msg_type = m.get("MsgType") or m.get("msgType")
|
||
if from_user and content and (msg_type in (1, None) or str(msg_type) == "1"):
|
||
if not _allowed_ai_reply(key, from_user):
|
||
continue
|
||
try:
|
||
asyncio.get_running_loop().create_task(_ai_takeover_reply(key, from_user, content))
|
||
except RuntimeError:
|
||
pass
|
||
return
|
||
|
||
# 3)兜底:单条 dict / 其它类型
|
||
store.append_sync_messages(key, [data])
|
||
m = data if isinstance(data, dict) else {}
|
||
if not _is_self_sent(m):
|
||
from_user = (m.get("FromUserName") or m.get("from") or _unwrap_wechat_field(m.get("from_user_name")) or "").strip()
|
||
content = (m.get("Content") or m.get("content") or _unwrap_wechat_field(m.get("content")) or "").strip()
|
||
msg_type = m.get("MsgType") or m.get("msgType")
|
||
if from_user and content and (msg_type in (1, None) or str(msg_type) == "1"):
|
||
if not _allowed_ai_reply(key, from_user):
|
||
return
|
||
try:
|
||
asyncio.get_running_loop().create_task(_ai_takeover_reply(key, from_user, content))
|
||
except RuntimeError:
|
||
pass
|
||
|
||
|
||
async def _register_message_callback(key: str) -> bool:
|
||
"""向 7006 注册消息回调:POST /message/SetCallback?key=xxx,使 7006 将新消息推送到本服务。"""
|
||
if not CALLBACK_BASE_URL or not key:
|
||
return False
|
||
url = f"{CHECK_STATUS_BASE_URL.rstrip('/')}/message/SetCallback"
|
||
callback_url = f"{CALLBACK_BASE_URL.rstrip('/')}/api/callback/wechat-message"
|
||
body = {"CallbackURL": callback_url, "Enabled": True}
|
||
try:
|
||
async with httpx.AsyncClient(trust_env=False, timeout=10.0) as client:
|
||
resp = await client.post(url, params={"key": key}, json=body)
|
||
if resp.status_code >= 400:
|
||
logger.warning("SetCallback %s key=%s: %s %s", url, key[-4:] if len(key) >= 4 else "****", resp.status_code, resp.text[:200])
|
||
return False
|
||
logger.info("SetCallback registered for key=***%s, CallbackURL=%s", key[-4:] if len(key) >= 4 else "****", callback_url)
|
||
return True
|
||
except Exception as e:
|
||
logger.warning("SetCallback error key=%s: %s", key[-4:] if len(key) >= 4 else "****", e)
|
||
return False
|
||
|
||
|
||
async def _run_greeting_scheduler() -> None:
|
||
"""定时检查到期问候任务,通过发送消息接口向匹配客户发送,并标记已执行。"""
|
||
check_interval = 30
|
||
while True:
|
||
try:
|
||
await asyncio.sleep(check_interval)
|
||
now = datetime.now()
|
||
all_tasks = store.list_greeting_tasks(key=None)
|
||
for task in all_tasks:
|
||
if not task.get("enabled"):
|
||
continue
|
||
if task.get("executed_at"):
|
||
continue
|
||
send_time = task.get("send_time") or task.get("cron")
|
||
if not send_time:
|
||
continue
|
||
dt = _parse_send_time(send_time)
|
||
if not dt or dt > now:
|
||
continue
|
||
task_id = task.get("id")
|
||
key = task.get("key")
|
||
customer_tags = set(task.get("customer_tags") or [])
|
||
template = (task.get("template") or "").strip() or "{{name}},您好!"
|
||
use_qwen = bool(task.get("use_qwen"))
|
||
customers = store.list_customers(key)
|
||
if customer_tags:
|
||
customers = [c for c in customers if set(c.get("tags") or []) & customer_tags]
|
||
for c in customers:
|
||
wxid = c.get("wxid")
|
||
if not wxid:
|
||
continue
|
||
remark_name = (c.get("remark_name") or "").strip() or wxid
|
||
if use_qwen:
|
||
user = f"请生成一句简短的微信问候语(1-2句话),客户备注名:{remark_name}"
|
||
region = (c.get("region") or "").strip()
|
||
if region:
|
||
user += f",地区:{region}"
|
||
tags = c.get("tags") or []
|
||
if tags:
|
||
user += f",标签:{','.join(tags)}"
|
||
user += "。不要解释,只输出问候语本身。"
|
||
try:
|
||
content = await llm_chat([{"role": "user", "content": user}])
|
||
except Exception as e:
|
||
logger.warning("Greeting task %s llm_chat error: %s", task_id, e)
|
||
content = template.replace("{{name}}", remark_name)
|
||
if not content or not content.strip():
|
||
content = template.replace("{{name}}", remark_name)
|
||
else:
|
||
content = template.replace("{{name}}", remark_name)
|
||
try:
|
||
await _send_message_upstream(key, wxid, content)
|
||
logger.info("Greeting task %s sent to %s", task_id, wxid)
|
||
except Exception as e:
|
||
logger.warning("Greeting task %s send to %s failed: %s", task_id, wxid, e)
|
||
store.update_greeting_task(task_id, executed_at=now.isoformat(), enabled=False)
|
||
logger.info("Greeting task %s executed_at set", task_id)
|
||
except asyncio.CancelledError:
|
||
break
|
||
except Exception as e:
|
||
logger.exception("Greeting scheduler error: %s", e)
|
||
|
||
|
||
@asynccontextmanager
|
||
async def lifespan(app: FastAPI):
|
||
set_message_callback(_on_ws_message)
|
||
_callback_key = (os.getenv("WECHAT_WS_KEY") or os.getenv("KEY") or os.getenv("WS_KEY") or "").strip()
|
||
if CALLBACK_BASE_URL:
|
||
keys_to_register = set(store.list_all_keys())
|
||
if _callback_key:
|
||
keys_to_register.add(_callback_key)
|
||
for k in keys_to_register:
|
||
if k and k.strip():
|
||
ok = await _register_message_callback(k.strip())
|
||
if ok:
|
||
logger.info("启动时已注册回调 key=***%s", k[-4:] if len(k) >= 4 else "****")
|
||
if keys_to_register:
|
||
logger.info("消息接收已切换为实时回调入口,不再启动 WS GetSyncMsg")
|
||
else:
|
||
asyncio.create_task(start_ws_sync())
|
||
else:
|
||
asyncio.create_task(start_ws_sync())
|
||
scheduler = asyncio.create_task(_run_greeting_scheduler())
|
||
yield
|
||
scheduler.cancel()
|
||
try:
|
||
await scheduler
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
|
||
app = FastAPI(title="WeChat Admin Backend (FastAPI)", lifespan=lifespan)
|
||
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=["*"],
|
||
allow_credentials=True,
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
)
|
||
|
||
|
||
class QrCodeRequest(BaseModel):
|
||
key: str
|
||
Proxy: Optional[str] = ""
|
||
IpadOrmac: Optional[str] = ""
|
||
Check: Optional[bool] = False
|
||
"""仅当需滑块且为「无数字」时传 True,重新取码用 Mac 设备;其余一律 iPad,传 Mac 易封号。"""
|
||
force_mac: Optional[bool] = False
|
||
|
||
|
||
class WakeUpRequest(BaseModel):
|
||
"""唤醒登录(只限扫码登录),仅调用 7006 WakeUpLogin,不拉取二维码。"""
|
||
key: str
|
||
Check: Optional[bool] = False
|
||
IpadOrmac: Optional[str] = "ipad"
|
||
Proxy: Optional[str] = ""
|
||
|
||
|
||
class VerifyCodeRequest(BaseModel):
|
||
"""手机验证码验证:默认只需要 key + code,data62/ticket 优先从缓存补全。"""
|
||
key: str
|
||
code: str
|
||
data62: Optional[str] = ""
|
||
ticket: Optional[str] = ""
|
||
|
||
|
||
@app.middleware("http")
|
||
async def log_requests(request: Request, call_next):
|
||
logger.info("HTTP %s %s from %s", request.method, request.url.path, request.client.host if request.client else "-")
|
||
response = await call_next(request)
|
||
logger.info("HTTP %s %s -> %s", request.method, request.url.path, response.status_code)
|
||
return response
|
||
|
||
|
||
@app.get("/health")
|
||
async def health() -> dict:
|
||
logger.info("Health check")
|
||
return {"status": "ok", "backend": "fastapi", "upstream": WECHAT_UPSTREAM_BASE_URL}
|
||
|
||
|
||
@app.get("/api/ws-status")
|
||
async def api_ws_status() -> dict:
|
||
"""WS(GetSyncMsg)连接状态,供前端在掉线时跳转登录页。"""
|
||
return {"connected": is_ws_connected()}
|
||
|
||
|
||
# 代理检测:用当前代理访问测试 URL,验证是否可用
|
||
PROXY_CHECK_URL = os.getenv("PROXY_CHECK_URL", "https://httpbin.org/ip")
|
||
|
||
|
||
@app.get("/api/check-proxy")
|
||
async def api_check_proxy(proxy: Optional[str] = Query(None, description="可选,指定要检测的代理 URL(不传则用环境变量/隧道/KDL)")):
|
||
"""检测代理是否可用:返回连通状态、出口 IP(若可解析)与错误原因。"""
|
||
def _extract_origin_ip(resp: httpx.Response) -> Optional[str]:
|
||
"""优先解析 httpbin /ip 的 origin 字段,解析失败返回 None。"""
|
||
try:
|
||
data = resp.json()
|
||
if isinstance(data, dict):
|
||
origin = (data.get("origin") or "").strip()
|
||
return origin or None
|
||
except Exception:
|
||
pass
|
||
return None
|
||
|
||
def _format_proxy_result(ok: bool, *, source: str, preview: str, status_code: Optional[int] = None,
|
||
origin_ip: Optional[str] = None, error: Optional[str] = None, note: Optional[str] = None) -> dict:
|
||
status = "available" if ok else "unavailable"
|
||
out = {
|
||
"ok": ok,
|
||
"status": status,
|
||
"source": source,
|
||
"proxy_preview": preview,
|
||
"check_url": PROXY_CHECK_URL,
|
||
}
|
||
if status_code is not None:
|
||
out["status_code"] = status_code
|
||
if origin_ip:
|
||
out["origin_ip"] = origin_ip
|
||
if note:
|
||
out["note"] = note
|
||
if error:
|
||
out["error"] = error
|
||
# 常见 socks 认证失败给出更可执行的提示
|
||
if "User was rejected by the SOCKS5 server" in error:
|
||
out["reason"] = "proxy_auth_rejected"
|
||
out["suggestion"] = "请检查代理用户名/密码、端口、以及代理服务商白名单配置"
|
||
return out
|
||
|
||
proxy_url = (proxy or "").strip()
|
||
source = "query"
|
||
if not proxy_url:
|
||
proxy_url = _resolve_proxy("", allow_auto=True)
|
||
if proxy_url == "__tunnel__":
|
||
proxy_url = _proxy_from_tunnel()
|
||
source = "tunnel"
|
||
logger.info("check-proxy: using tunnel -> %s", "socks5h://***@%s/" % TUNNEL_PROXY)
|
||
elif proxy_url == "__kdl__":
|
||
proxy_url = await _proxy_from_kdl()
|
||
source = "kdl"
|
||
logger.info("check-proxy: using kdl -> %s", "http://***@%s/" % (proxy_url.split("@", 1)[-1].rstrip("/") if proxy_url else "?"))
|
||
else:
|
||
source = "env" if proxy_url else "none"
|
||
if proxy_url:
|
||
logger.info("check-proxy: using env/body, len=%s", len(proxy_url))
|
||
if not proxy_url:
|
||
return _format_proxy_result(
|
||
False,
|
||
source="none",
|
||
preview="(empty)",
|
||
error="未配置代理。请填写代理、或设置 HTTP_PROXY/HTTPS_PROXY、或配置 TUNNEL_PROXY(固定隧道)、或 KDL 代理 API。",
|
||
)
|
||
# 脱敏显示(不暴露密码)
|
||
def _preview(u: str) -> str:
|
||
if not u or "@" not in u:
|
||
return u[:50] + "…" if len(u) > 50 else u
|
||
pre, at = u.rsplit("@", 1)
|
||
if "://" in pre:
|
||
scheme = pre.split("://", 1)[0] + "://"
|
||
rest = pre[len(scheme) :]
|
||
if ":" in rest:
|
||
user, _ = rest.split(":", 1)
|
||
pre = scheme + user + ":***"
|
||
else:
|
||
pre = scheme + "***"
|
||
else:
|
||
pre = "***"
|
||
return pre + "@" + (at[:30] + "…" if len(at) > 30 else at)
|
||
|
||
preview = _preview(proxy_url)
|
||
logger.info("check-proxy: source=%s, proxy_preview=%s", source, preview)
|
||
is_socks = proxy_url.strip().lower().startswith("socks5")
|
||
if is_socks:
|
||
try:
|
||
from httpx_socks import AsyncProxyTransport
|
||
# httpx_socks/urllib 仅识别 socks5,将 socks5h 转为 socks5
|
||
transport_url = proxy_url.strip()
|
||
if transport_url.lower().startswith("socks5h://"):
|
||
transport_url = "socks5://" + transport_url[10:]
|
||
transport = AsyncProxyTransport.from_url(transport_url)
|
||
async with httpx.AsyncClient(trust_env=False, timeout=15.0, transport=transport) as client:
|
||
resp = await client.get(PROXY_CHECK_URL)
|
||
if resp.status_code == 200:
|
||
origin_ip = _extract_origin_ip(resp)
|
||
logger.info("check-proxy: ok (socks), status=%s", resp.status_code)
|
||
return _format_proxy_result(
|
||
True, source=source, preview=preview, status_code=resp.status_code, origin_ip=origin_ip
|
||
)
|
||
logger.warning("check-proxy: fail (socks), status=%s", resp.status_code)
|
||
return _format_proxy_result(
|
||
False, source=source, preview=preview, status_code=resp.status_code,
|
||
error=f"请求测试页返回 {resp.status_code}"
|
||
)
|
||
except ImportError:
|
||
logger.info("check-proxy: socks5 已配置,跳过连通性检测(需 pip install httpx-socks 方可检测)")
|
||
return _format_proxy_result(
|
||
True, source=source, preview=preview,
|
||
note="socks5 代理已配置;连通性检测需安装 pip install httpx-socks"
|
||
)
|
||
except Exception as e:
|
||
logger.warning("check-proxy: socks exception %s", e)
|
||
return _format_proxy_result(False, source=source, preview=preview, error=str(e))
|
||
try:
|
||
async with httpx.AsyncClient(trust_env=False, timeout=15.0, proxy=proxy_url) as client:
|
||
resp = await client.get(PROXY_CHECK_URL)
|
||
if resp.status_code == 200:
|
||
origin_ip = _extract_origin_ip(resp)
|
||
logger.info("check-proxy: ok, status=%s", resp.status_code)
|
||
return _format_proxy_result(
|
||
True, source=source, preview=preview, status_code=resp.status_code, origin_ip=origin_ip
|
||
)
|
||
logger.warning("check-proxy: fail, status=%s", resp.status_code)
|
||
return _format_proxy_result(
|
||
False, source=source, preview=preview, status_code=resp.status_code,
|
||
error=f"请求测试页返回 {resp.status_code}"
|
||
)
|
||
except Exception as e:
|
||
logger.warning("check-proxy: exception %s", e)
|
||
return _format_proxy_result(False, source=source, preview=preview, error=str(e))
|
||
|
||
|
||
def _proxy_from_env() -> str:
|
||
"""当登录页未填代理时,使用环境变量中的代理(服务器上设置 HTTP_PROXY/HTTPS_PROXY 后生效)。"""
|
||
return (
|
||
(os.environ.get("HTTPS_PROXY") or os.environ.get("https_proxy") or "").strip()
|
||
or (os.environ.get("HTTP_PROXY") or os.environ.get("http_proxy") or "").strip()
|
||
or ""
|
||
)
|
||
|
||
|
||
# 固定隧道代理(socks5h + 用户名密码):未配置登录页/环境变量时使用
|
||
# 格式与 requests 示例一致:socks5h://user:pwd@host:port/
|
||
TUNNEL_PROXY = (os.getenv("TUNNEL_PROXY") or "").strip() # 例如 218.78.109.253:16816
|
||
TUNNEL_PROXY_USERNAME = (os.getenv("TUNNEL_PROXY_USERNAME") or "").strip()
|
||
TUNNEL_PROXY_PASSWORD = (os.getenv("TUNNEL_PROXY_PASSWORD") or "").strip()
|
||
|
||
# 快代理 KDL API(可选):隧道未配置时从此接口拉取代理
|
||
KDL_PROXY_API_URL = (os.getenv("KDL_PROXY_API_URL") or "").strip()
|
||
KDL_PROXY_USERNAME = (os.getenv("KDL_PROXY_USERNAME") or "").strip()
|
||
KDL_PROXY_PASSWORD = (os.getenv("KDL_PROXY_PASSWORD") or "").strip()
|
||
|
||
# 启动时打印代理配置情况(不打印密码)
|
||
def _log_proxy_config() -> None:
|
||
tunnel_ok = bool(TUNNEL_PROXY and TUNNEL_PROXY_USERNAME and TUNNEL_PROXY_PASSWORD)
|
||
kdl_ok = bool(KDL_PROXY_API_URL and KDL_PROXY_USERNAME and KDL_PROXY_PASSWORD)
|
||
logger.info(
|
||
"proxy config: tunnel=%s (TUNNEL_PROXY=%s), kdl=%s (KDL_API=%s)",
|
||
tunnel_ok,
|
||
TUNNEL_PROXY or "(empty)",
|
||
kdl_ok,
|
||
"set" if KDL_PROXY_API_URL else "(empty)",
|
||
)
|
||
|
||
|
||
_log_proxy_config()
|
||
|
||
|
||
def _proxy_from_tunnel() -> str:
|
||
"""使用固定隧道代理,格式 socks5h://user:pwd@host:port/,供 7006 使用。"""
|
||
if not TUNNEL_PROXY:
|
||
return ""
|
||
user = (TUNNEL_PROXY_USERNAME or KDL_PROXY_USERNAME or "").strip()
|
||
pwd = (TUNNEL_PROXY_PASSWORD or KDL_PROXY_PASSWORD or "").strip()
|
||
if not user or not pwd:
|
||
return ""
|
||
return "socks5h://%(user)s:%(pwd)s@%(proxy)s/" % {
|
||
"user": user,
|
||
"pwd": pwd,
|
||
"proxy": TUNNEL_PROXY,
|
||
}
|
||
|
||
|
||
async def _proxy_from_kdl() -> str:
|
||
"""从快代理 API 获取一个代理 IP,格式化为 http://user:pwd@ip:port/ 供 7006 使用。"""
|
||
if not KDL_PROXY_API_URL or not KDL_PROXY_USERNAME or not KDL_PROXY_PASSWORD:
|
||
return ""
|
||
try:
|
||
async with httpx.AsyncClient(trust_env=False, timeout=10.0) as client:
|
||
resp = await client.get(KDL_PROXY_API_URL)
|
||
resp.raise_for_status()
|
||
proxy_ip = (resp.text or "").strip()
|
||
if not proxy_ip:
|
||
return ""
|
||
return "http://%(user)s:%(pwd)s@%(proxy)s/" % {
|
||
"user": KDL_PROXY_USERNAME,
|
||
"pwd": KDL_PROXY_PASSWORD,
|
||
"proxy": proxy_ip,
|
||
}
|
||
except Exception as e:
|
||
logger.warning("KDL proxy fetch failed: %s", e)
|
||
return ""
|
||
|
||
|
||
# 隧道代理若以 http 形式传入(登录页或 KDL 返回),统一改为 socks5h 再传给 7006
|
||
TUNNEL_PROXY_NORMALIZE_HOST = (os.getenv("TUNNEL_PROXY_NORMALIZE_HOST") or "218.78.109.253:16816").strip()
|
||
|
||
|
||
def _proxy_preview_for_log(proxy: str) -> str:
|
||
"""代理脱敏,用于日志打印(不暴露密码)。"""
|
||
if not proxy or not isinstance(proxy, str):
|
||
return "(empty)"
|
||
u = proxy.strip()
|
||
if not u:
|
||
return "(empty)"
|
||
if "@" not in u:
|
||
return u[:50] + "…" if len(u) > 50 else u
|
||
pre, at = u.rsplit("@", 1)
|
||
at = at.rstrip("/").split("/")[0].split("?")[0]
|
||
if "://" in pre:
|
||
scheme = pre.split("://", 1)[0] + "://"
|
||
rest = pre[len(scheme):]
|
||
user = rest.split(":", 1)[0] if ":" in rest else "***"
|
||
pre = scheme + user + ":***"
|
||
else:
|
||
pre = "***"
|
||
return pre + "@" + (at[:40] + "…" if len(at) > 40 else at) + "/"
|
||
|
||
|
||
def _normalize_proxy_scheme_to_socks5h(proxy: str) -> str:
|
||
"""若代理是隧道地址但用了 http,改为 socks5h(7006 需 socks5h)。"""
|
||
if not proxy or not isinstance(proxy, str):
|
||
return proxy
|
||
p = proxy.strip()
|
||
if not p.startswith("http://") or "@" not in p:
|
||
return p
|
||
try:
|
||
host_part = p.split("@", 1)[1].rstrip("/").split("/")[0].split("?")[0]
|
||
except IndexError:
|
||
return p
|
||
if host_part != TUNNEL_PROXY and host_part != "218.78.109.253:16816":
|
||
return p
|
||
out = "socks5h://" + p[7:]
|
||
logger.info("proxy normalize: http -> socks5h for tunnel %s", host_part)
|
||
return out
|
||
|
||
|
||
def _resolve_proxy(body_proxy: str, *, allow_auto: bool = True) -> str:
|
||
"""解析最终传给 7006 的代理:请求体 > 环境变量 > 固定隧道 >(可选)KDL API。"""
|
||
p = (body_proxy or "").strip()
|
||
if p:
|
||
logger.debug("proxy resolve: from body, len=%s", len(p))
|
||
return p
|
||
p = _proxy_from_env()
|
||
if p:
|
||
logger.debug("proxy resolve: from env (HTTP_PROXY/HTTPS_PROXY), len=%s", len(p))
|
||
return p
|
||
if not allow_auto:
|
||
return ""
|
||
# 隧道:只要 TUNNEL_PROXY 有值且能凑齐账号密码(含用 KDL_* 兜底)则优先隧道
|
||
tunnel_user = (TUNNEL_PROXY_USERNAME or KDL_PROXY_USERNAME or "").strip()
|
||
tunnel_pwd = (TUNNEL_PROXY_PASSWORD or KDL_PROXY_PASSWORD or "").strip()
|
||
if TUNNEL_PROXY and tunnel_user and tunnel_pwd:
|
||
logger.info("proxy resolve: auto -> tunnel (socks5h), TUNNEL_PROXY=%s", TUNNEL_PROXY)
|
||
return "__tunnel__"
|
||
if KDL_PROXY_API_URL and KDL_PROXY_USERNAME and KDL_PROXY_PASSWORD:
|
||
logger.info("proxy resolve: auto -> kdl (fetch from API)")
|
||
return "__kdl__"
|
||
logger.debug("proxy resolve: no auto proxy configured")
|
||
return ""
|
||
|
||
|
||
@app.post("/auth/wake")
|
||
async def wake_up_login(body: WakeUpRequest):
|
||
"""唤醒登录:仅调用上游 /login/WakeUpLogin(只限扫码登录),不获取二维码。"""
|
||
key = (body.key or "").strip()
|
||
if not key:
|
||
raise HTTPException(status_code=400, detail="key is required")
|
||
proxy = _resolve_proxy(body.Proxy or "", allow_auto=True)
|
||
if proxy == "__tunnel__":
|
||
proxy = _proxy_from_tunnel()
|
||
if proxy:
|
||
logger.info("WakeUpLogin: using proxy from tunnel (socks5h), len=%s", len(proxy))
|
||
elif proxy == "__kdl__":
|
||
proxy = await _proxy_from_kdl()
|
||
if proxy:
|
||
logger.info("WakeUpLogin: using proxy from KDL API, len=%s", len(proxy))
|
||
if not proxy:
|
||
logger.info("WakeUpLogin: Proxy 为空,请在 .env 中配置 TUNNEL_PROXY 或 HTTP_PROXY/HTTPS_PROXY 或 KDL,或登录页填写代理")
|
||
elif proxy not in ("__tunnel__", "__kdl__"):
|
||
logger.info("WakeUpLogin: using proxy from body/env, len=%s", len(proxy))
|
||
if proxy in ("__tunnel__", "__kdl__"):
|
||
proxy = ""
|
||
proxy = _normalize_proxy_scheme_to_socks5h(proxy)
|
||
ipad_ormac = (body.IpadOrmac or "").strip() or "ipad"
|
||
payload = {
|
||
"Check": body.Check,
|
||
"IpadOrmac": ipad_ormac,
|
||
"Proxy": proxy,
|
||
}
|
||
url = f"{WECHAT_UPSTREAM_BASE_URL.rstrip('/')}/login/WakeUpLogin"
|
||
logger.info(
|
||
"WakeUpLogin 请求参数: key=%s, url=%s, Proxy=%s, Check=%s, IpadOrmac=%s",
|
||
key, url, proxy, body.Check, ipad_ormac,
|
||
)
|
||
try:
|
||
async with httpx.AsyncClient(trust_env=False, timeout=20.0) as client:
|
||
resp = await client.post(url, params={"key": key}, json=payload)
|
||
except Exception as exc:
|
||
logger.exception("Error calling upstream WakeUpLogin: %s", exc)
|
||
raise HTTPException(
|
||
status_code=502,
|
||
detail={"error": "upstream_connect_error", "detail": str(exc)},
|
||
) from exc
|
||
if resp.status_code >= 400:
|
||
body_preview = resp.text[:500]
|
||
logger.warning("WakeUpLogin bad response: status=%s, body=%s", resp.status_code, body_preview)
|
||
raise HTTPException(
|
||
status_code=502,
|
||
detail={"error": "upstream_bad_response", "status_code": resp.status_code, "body": body_preview},
|
||
)
|
||
try:
|
||
data = resp.json()
|
||
except Exception:
|
||
data = {"ok": True, "text": resp.text[:200]}
|
||
logger.info("WakeUpLogin success: status=%s", resp.status_code)
|
||
return data
|
||
|
||
|
||
@app.post("/auth/qrcode")
|
||
async def get_login_qrcode(body: QrCodeRequest):
|
||
key = body.key
|
||
if not key:
|
||
raise HTTPException(status_code=400, detail="key is required")
|
||
|
||
proxy = _resolve_proxy(body.Proxy or "", allow_auto=True)
|
||
if proxy == "__tunnel__":
|
||
proxy = _proxy_from_tunnel()
|
||
if proxy:
|
||
logger.info("GetLoginQrCodeNewDirect: using proxy from tunnel (socks5h), len=%s", len(proxy))
|
||
elif proxy == "__kdl__":
|
||
proxy = await _proxy_from_kdl()
|
||
if proxy:
|
||
logger.info("GetLoginQrCodeNewDirect: using proxy from KDL API, len=%s", len(proxy))
|
||
if proxy in ("__tunnel__", "__kdl__"):
|
||
proxy = ""
|
||
proxy = _normalize_proxy_scheme_to_socks5h(proxy)
|
||
if proxy:
|
||
logger.info("GetLoginQrCodeNewDirect: proxy=yes, force_mac=%s, IpadOrmac=%s", body.force_mac, "mac" if body.force_mac else (body.IpadOrmac or "ipad"))
|
||
else:
|
||
logger.info("GetLoginQrCodeNewDirect: proxy=empty(未配置则后端自动读 env/KDL),force_mac=%s", body.force_mac)
|
||
payload = body.dict(exclude={"key", "force_mac"})
|
||
payload["Check"] = False
|
||
payload["IpadOrmac"] = "mac" if body.force_mac else ((body.IpadOrmac or "").strip() or "ipad")
|
||
payload["Proxy"] = proxy
|
||
|
||
url = f"{WECHAT_UPSTREAM_BASE_URL}/login/GetLoginQrCodeNewDirect"
|
||
logger.info(
|
||
"GetLoginQrCodeNewDirect 请求参数: key=%s, url=%s, Proxy=%s, Check=%s, IpadOrmac=%s",
|
||
key, url, proxy, False, payload["IpadOrmac"],
|
||
)
|
||
try:
|
||
async with httpx.AsyncClient(trust_env=False, timeout=20.0) as client:
|
||
resp = await client.post(url, params={"key": key}, json=payload)
|
||
except Exception as exc:
|
||
logger.exception("Error calling upstream GetLoginQrCodeNewDirect: %s", exc)
|
||
raise HTTPException(
|
||
status_code=502,
|
||
detail={"error": "upstream_connect_error", "detail": str(exc)},
|
||
) from exc
|
||
|
||
body_text = resp.text[:500]
|
||
if resp.status_code >= 400:
|
||
logger.warning(
|
||
"Upstream GetLoginQrCodeNewDirect bad response: status=%s, body=%s",
|
||
resp.status_code,
|
||
body_text,
|
||
)
|
||
raise HTTPException(
|
||
status_code=502,
|
||
detail={
|
||
"error": "upstream_bad_response",
|
||
"status_code": resp.status_code,
|
||
"body": body_text,
|
||
},
|
||
)
|
||
|
||
logger.info(
|
||
"Upstream GetLoginQrCodeNewDirect success: status=%s, body_len=%s",
|
||
resp.status_code,
|
||
len(body_text),
|
||
)
|
||
data = resp.json()
|
||
# 保存 Data62 完整原始数据,不清理不截断;仅做完整性校验供前端打印到操作日志
|
||
try:
|
||
data62_full = (data.get("Data62") or "").strip()
|
||
if not data62_full and isinstance(data.get("Data"), dict):
|
||
data62_full = (data.get("Data").get("Data62") or data.get("Data").get("data62") or "").strip()
|
||
qrcode_store[key] = {"data62": data62_full, "response": data}
|
||
data["_data62_stored"] = True
|
||
data["_data62_length"] = len(data62_full)
|
||
check = _validate_data62(data62_full)
|
||
data["_data62_valid"] = check["valid"]
|
||
data["_data62_check"] = check["message"]
|
||
data["_data62_raw_length"] = check["raw_length"]
|
||
data["_data62_clean_length"] = len(data62_full)
|
||
data["_data62_preview"] = check["preview"]
|
||
logger.info(
|
||
"Stored Data62 (full) for key=%s (len=%s), valid=%s, check=%s",
|
||
key, len(data62_full), check["valid"], check["message"],
|
||
)
|
||
logger.info("Data62 full: %s", data62_full)
|
||
except Exception as e:
|
||
logger.warning("Store qrcode data for key=%s failed: %s", key, e)
|
||
return data
|
||
|
||
|
||
@app.get("/auth/status")
|
||
async def get_online_status(
|
||
key: str = Query(..., description="账号唯一标识"),
|
||
):
|
||
if not key:
|
||
raise HTTPException(status_code=400, detail="key is required")
|
||
|
||
url = f"{WECHAT_UPSTREAM_BASE_URL}/login/GetLoginStatus"
|
||
logger.info("GetLoginStatus: key=%s, url=%s", key, url)
|
||
try:
|
||
async with httpx.AsyncClient(trust_env=False, timeout=15.0) as client:
|
||
resp = await client.get(url, params={"key": key})
|
||
except Exception as exc:
|
||
logger.exception("Error calling upstream GetLoginStatus: %s", exc)
|
||
raise HTTPException(status_code=502, detail=f"upstream_error: {exc}") from exc
|
||
|
||
body_text = resp.text[:500]
|
||
logger.info(
|
||
"Upstream GetLoginStatus response: status=%s, body=%s",
|
||
resp.status_code,
|
||
body_text,
|
||
)
|
||
return resp.json()
|
||
|
||
|
||
def _extract_clean_ticket(obj: dict) -> Optional[str]:
|
||
"""从扫码状态返回中提取 ticket,去掉乱码(只保留可见 ASCII 到第一个非法字符前)。"""
|
||
if not obj or not isinstance(obj, dict):
|
||
return None
|
||
d = obj.get("Data") if isinstance(obj.get("Data"), dict) else obj
|
||
raw = (
|
||
(d.get("ticket") if d else None)
|
||
or obj.get("ticket")
|
||
or obj.get("Ticket")
|
||
)
|
||
if not raw:
|
||
wvu = obj.get("wechat_verify_url") or ""
|
||
if isinstance(wvu, str) and "ticket=" in wvu:
|
||
raw = wvu.split("ticket=", 1)[1].split("&")[0]
|
||
if not raw or not isinstance(raw, str):
|
||
return None
|
||
clean = []
|
||
for ch in raw:
|
||
code = ord(ch)
|
||
if code == 0xFFFD or code < 32 or code > 126:
|
||
break
|
||
clean.append(ch)
|
||
return "".join(clean) if clean else None
|
||
|
||
|
||
def _clean_data62(s: str) -> str:
|
||
"""去掉 Data62 尾部乱码:以 d000 标识乱码起始,截断保留此前有效内容。"""
|
||
if not s or not isinstance(s, str):
|
||
return ""
|
||
s = s.strip()
|
||
idx = s.find("d0000000000000101000000000000000d0000000000000000000000000000007f")
|
||
if idx != -1:
|
||
return s[:idx].strip()
|
||
idx = s.find("d0000000000000101")
|
||
if idx != -1:
|
||
return s[:idx].strip()
|
||
idx = s.find("d00000000")
|
||
if idx != -1:
|
||
return s[:idx].strip()
|
||
return s
|
||
|
||
|
||
def _validate_data62(raw: str) -> dict:
|
||
"""检查 Data62 完整原始数据是否有效,不清理不截断,仅做格式与长度校验。"""
|
||
raw = (raw or "").strip()
|
||
raw_len = len(raw)
|
||
min_len = 32
|
||
hex_ok = bool(raw and all(c in "0123456789abcdefABCDEF" for c in raw))
|
||
length_ok = raw_len >= min_len
|
||
valid = bool(raw and hex_ok and length_ok)
|
||
if not raw:
|
||
msg = "无 Data62 或为空"
|
||
elif not hex_ok:
|
||
msg = "非十六进制格式"
|
||
elif not length_ok:
|
||
msg = f"长度不足({raw_len} < {min_len})"
|
||
else:
|
||
msg = "完整有效"
|
||
preview = (raw[:80] + "…") if raw_len > 80 else (raw or "")
|
||
return {
|
||
"valid": valid,
|
||
"message": msg,
|
||
"raw_length": raw_len,
|
||
"clean_length": raw_len,
|
||
"preview": preview,
|
||
}
|
||
|
||
|
||
@app.get("/auth/scan-status")
|
||
async def check_scan_status(
|
||
key: str = Query(..., description="账号唯一标识"),
|
||
):
|
||
if not key:
|
||
raise HTTPException(status_code=400, detail="key is required")
|
||
|
||
url = f"{CHECK_STATUS_BASE_URL}/login/CheckLoginStatus"
|
||
logger.info("CheckLoginStatus: key=%s, url=%s", key, url)
|
||
try:
|
||
async with httpx.AsyncClient(trust_env=False, timeout=15.0) as client:
|
||
resp = await client.get(url, params={"key": key})
|
||
except Exception as exc:
|
||
logger.exception("Error calling upstream CheckLoginStatus: %s", exc)
|
||
raise HTTPException(status_code=502, detail=f"upstream_error: {exc}") from exc
|
||
|
||
body_full = resp.text
|
||
logger.info(
|
||
"Upstream CheckLoginStatus response: status=%s, body=%s",
|
||
resp.status_code,
|
||
body_full[:2000] if len(body_full) > 2000 else body_full,
|
||
)
|
||
try:
|
||
data = resp.json() if body_full.strip() else {}
|
||
except Exception:
|
||
data = {"Code": resp.status_code, "Text": body_full[:500] or "Non-JSON response"}
|
||
if not isinstance(data, dict):
|
||
data = {"Code": resp.status_code, "Text": str(data)[:500]}
|
||
ticket = _extract_clean_ticket(data)
|
||
if ticket:
|
||
# data62 使用完整原始数据,来自 GetLoginQrCodeNewDirect 的存储或本次响应的 Data62
|
||
stored = qrcode_store.get(key) or {}
|
||
data62 = (stored.get("data62") or "").strip()
|
||
if not data62:
|
||
data62 = (data.get("Data62") or (data.get("Data") or {}).get("Data62") or (data.get("Data") or {}).get("data62") or "").strip()
|
||
# 缓存 ticket 与 data62,供 /auth/verify-code 直接使用(前端只填验证码即可)
|
||
qrcode_store[key] = {
|
||
**stored,
|
||
"ticket": ticket,
|
||
"data62": data62,
|
||
}
|
||
params = {"key": SLIDER_VERIFY_KEY, "ticket": ticket}
|
||
if data62:
|
||
params["data62"] = data62
|
||
data["slider_url"] = f"/auth/slider-form?{urlencode(params)}"
|
||
logger.info(
|
||
"Attached slider_url (slider-form) for key=%s (ticket len=%s, data62 len=%s)",
|
||
key,
|
||
len(ticket),
|
||
len(data62),
|
||
)
|
||
return data
|
||
|
||
|
||
@app.post("/auth/verify-code")
|
||
async def verify_login_code(body: VerifyCodeRequest):
|
||
"""
|
||
手机验证码验证(本地入口):
|
||
- 前端只传 key + code;
|
||
- data62/ticket 优先从缓存补全;
|
||
- 转发到 7006: POST /login/VerifyCode?key=...
|
||
"""
|
||
key = (body.key or "").strip()
|
||
code = (body.code or "").strip()
|
||
if not key:
|
||
raise HTTPException(status_code=400, detail="key is required")
|
||
if not code:
|
||
raise HTTPException(status_code=400, detail="code is required")
|
||
|
||
stored = qrcode_store.get(key) or {}
|
||
data62 = (body.data62 or "").strip() or (stored.get("data62") or "").strip()
|
||
ticket = (body.ticket or "").strip() or (stored.get("ticket") or "").strip()
|
||
if not data62 or not ticket:
|
||
raise HTTPException(
|
||
status_code=400,
|
||
detail="missing data62 or ticket; please get qrcode and check scan status first",
|
||
)
|
||
|
||
payload = {
|
||
"code": code,
|
||
"data62": data62,
|
||
"ticket": ticket,
|
||
}
|
||
url = f"{CHECK_STATUS_BASE_URL.rstrip('/')}/login/VerifyCode"
|
||
logger.info("VerifyCode: key=%s, url=%s, code_len=%s, data62_len=%s, ticket_len=%s",
|
||
key, url, len(code), len(data62), len(ticket))
|
||
try:
|
||
async with httpx.AsyncClient(trust_env=False, timeout=20.0) as client:
|
||
resp = await client.post(url, params={"key": key}, json=payload)
|
||
except Exception as exc:
|
||
logger.exception("Error calling upstream VerifyCode: %s", exc)
|
||
raise HTTPException(status_code=502, detail=f"upstream_error: {exc}") from exc
|
||
|
||
body_text = resp.text[:500]
|
||
logger.info("Upstream VerifyCode response: status=%s, body=%s", resp.status_code, body_text)
|
||
if resp.status_code >= 400:
|
||
raise HTTPException(
|
||
status_code=502,
|
||
detail={
|
||
"error": "upstream_bad_response",
|
||
"status_code": resp.status_code,
|
||
"body": body_text,
|
||
},
|
||
)
|
||
try:
|
||
return resp.json()
|
||
except Exception:
|
||
return {"ok": True, "text": body_text}
|
||
|
||
|
||
def _slider_form_html(key_val: str, data62_val: str, ticket_val: str) -> str:
|
||
"""本地滑块验证页:与 7765 相同 DOM 结构(#app、keyInput、data62Input、originalTicketInput),加载 7765 的 module 脚本,不用 iframe。"""
|
||
k = html.escape(key_val, quote=True)
|
||
d = html.escape(data62_val, quote=True)
|
||
t = html.escape(ticket_val, quote=True)
|
||
# 脚本走本机代理 /auth/slider-assets/...,避免跨域加载 7765 被 CORS 拦截
|
||
script_src = "/auth/slider-assets/N_jYM_2V.js"
|
||
return f"""<!DOCTYPE html>
|
||
<html lang="zh-CN">
|
||
<head>
|
||
<meta charset="UTF-8">
|
||
<meta name="viewport" content="width=device-width, initial-scale=1">
|
||
<title>滑块验证</title>
|
||
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3/dist/css/bootstrap.min.css" rel="stylesheet" crossorigin="anonymous">
|
||
</head>
|
||
<body>
|
||
<div id="app" data-v-app="">
|
||
<div data-v-220db9a4="" class="app-container">
|
||
<div data-v-220db9a4="" class="card">
|
||
<div data-v-220db9a4="" class="card-header">
|
||
<h5 data-v-220db9a4="" class="mb-0">滑块验证</h5>
|
||
</div>
|
||
<div data-v-220db9a4="" class="card-body">
|
||
<div data-v-220db9a4="" class="params-section mb-4">
|
||
<div data-v-220db9a4="" class="row">
|
||
<div data-v-220db9a4="" class="col-12 col-md-6 mb-3">
|
||
<label data-v-220db9a4="" for="keyInput" class="form-label">Key:</label>
|
||
<input data-v-220db9a4="" type="text" class="form-control" id="keyInput" placeholder="请输入key" value="{k}">
|
||
</div>
|
||
<div data-v-220db9a4="" class="col-12 col-md-6 mb-3">
|
||
<label data-v-220db9a4="" for="data62Input" class="form-label">Data62:</label>
|
||
<input data-v-220db9a4="" type="text" class="form-control" id="data62Input" placeholder="请输入data62" value="{d}">
|
||
</div>
|
||
<div data-v-220db9a4="" class="col-12 mb-3">
|
||
<label data-v-220db9a4="" for="originalTicketInput" class="form-label">Original Ticket:</label>
|
||
<input data-v-220db9a4="" type="text" class="form-control" id="originalTicketInput" placeholder="请输入original_ticket" value="{t}">
|
||
</div>
|
||
</div>
|
||
</div>
|
||
<div data-v-220db9a4="" class="text-center mb-3">
|
||
<button data-v-220db9a4="" type="button" class="btn btn-verify btn-lg" disabled="">开始验证</button>
|
||
<div data-v-220db9a4="" class="text-muted mt-2">请先填写完整的参数信息</div>
|
||
</div>
|
||
</div>
|
||
</div>
|
||
</div>
|
||
</div>
|
||
<script type="module" crossorigin src="{script_src}"></script>
|
||
</body>
|
||
</html>"""
|
||
|
||
|
||
@app.get("/auth/slider-assets/{path:path}")
|
||
async def slider_asset_proxy(path: str):
|
||
"""代理 7765 的 assets(如 N_jYM_2V.js),避免跨域加载被 CORS 拦截。"""
|
||
url = f"{SLIDER_VERIFY_BASE_URL.rstrip('/')}/assets/{path}"
|
||
try:
|
||
async with httpx.AsyncClient(trust_env=False, timeout=15.0) as client:
|
||
resp = await client.get(url)
|
||
if resp.status_code >= 400:
|
||
raise HTTPException(status_code=resp.status_code, detail=resp.text[:200])
|
||
media_type = "application/javascript" if path.endswith(".js") else "application/octet-stream"
|
||
return Response(
|
||
content=resp.content,
|
||
media_type=media_type,
|
||
headers={"Cache-Control": "no-store, no-cache, must-revalidate", "Pragma": "no-cache"},
|
||
)
|
||
except HTTPException:
|
||
raise
|
||
except Exception as e:
|
||
logger.warning("Slider asset proxy error: %s", e)
|
||
raise HTTPException(status_code=502, detail=str(e)) from e
|
||
|
||
|
||
@app.get("/auth/slider-form", response_class=HTMLResponse)
|
||
async def slider_form(
|
||
key: str = Query(..., description="Key(提交到第三方滑块)"),
|
||
data62: str = Query("", description="Data62"),
|
||
ticket: str = Query(..., description="Original Ticket"),
|
||
):
|
||
"""本地滑块验证页:与 7765 同 DOM,脚本经本机代理加载,避免 CORS。Data62 使用完整原始数据。"""
|
||
return HTMLResponse(content=_slider_form_html(key, data62.strip(), ticket))
|
||
|
||
|
||
# ---------- 滑块验证提交接口(代理 7765) ----------
|
||
# 7765 页面提交为 GET:action=SLIDER_VERIFY_BASE_URL,参数 key、data62、original_ticket
|
||
@app.get("/api/slider-verify")
|
||
async def api_slider_verify_get(
|
||
key: str = Query(..., description="Key"),
|
||
data62: str = Query("", description="Data62"),
|
||
original_ticket: str = Query(..., description="Original Ticket(与 ticket 二选一)"),
|
||
ticket: str = Query("", description="Original Ticket(与 original_ticket 二选一)"),
|
||
):
|
||
"""代理 7765 滑块提交:GET 转发到 http://113.44.162.180:7765/?key=&data62=&original_ticket=,返回上游响应。"""
|
||
ticket_val = original_ticket or ticket
|
||
if not ticket_val:
|
||
raise HTTPException(status_code=400, detail="original_ticket or ticket required")
|
||
url = SLIDER_VERIFY_BASE_URL.rstrip("/") + "/"
|
||
params = {"key": key, "data62": (data62 or "").strip(), "original_ticket": ticket_val}
|
||
try:
|
||
async with httpx.AsyncClient(trust_env=False, timeout=30.0) as client:
|
||
resp = await client.get(url, params=params)
|
||
# 返回上游的 body;若为 JSON 则解析后返回
|
||
try:
|
||
return resp.json()
|
||
except Exception:
|
||
return {"ok": resp.status_code == 200, "status_code": resp.status_code, "text": resp.text[:500]}
|
||
except Exception as e:
|
||
logger.warning("Slider verify upstream error: %s", e)
|
||
raise HTTPException(status_code=502, detail=f"slider_upstream_error: {e}") from e
|
||
|
||
|
||
class SliderVerifyBody(BaseModel):
|
||
key: str
|
||
data62: Optional[str] = ""
|
||
original_ticket: Optional[str] = None
|
||
ticket: Optional[str] = None
|
||
|
||
|
||
@app.post("/api/slider-verify")
|
||
async def api_slider_verify_post(body: SliderVerifyBody):
|
||
"""代理 7765 滑块提交:POST body 转成 GET 请求转发到 7765,返回上游响应。"""
|
||
ticket_val = body.original_ticket or body.ticket
|
||
if not ticket_val:
|
||
raise HTTPException(status_code=400, detail="original_ticket or ticket required")
|
||
url = SLIDER_VERIFY_BASE_URL.rstrip("/") + "/"
|
||
params = {"key": body.key, "data62": (body.data62 or "").strip(), "original_ticket": ticket_val}
|
||
try:
|
||
async with httpx.AsyncClient(trust_env=False, timeout=30.0) as client:
|
||
resp = await client.get(url, params=params)
|
||
try:
|
||
return resp.json()
|
||
except Exception:
|
||
return {"ok": resp.status_code == 200, "status_code": resp.status_code, "text": resp.text[:500]}
|
||
except Exception as e:
|
||
logger.warning("Slider verify upstream error: %s", e)
|
||
raise HTTPException(status_code=502, detail=f"slider_upstream_error: {e}") from e
|
||
|
||
|
||
# ---------- R1-2 客户画像 / R1-3 定时问候 / R1-4 分群推送 / 消息与发送 ----------
|
||
|
||
class CustomerCreate(BaseModel):
|
||
key: str
|
||
wxid: str
|
||
remark_name: Optional[str] = ""
|
||
region: Optional[str] = ""
|
||
age: Optional[str] = ""
|
||
gender: Optional[str] = ""
|
||
level: Optional[str] = "" # 拿货等级
|
||
tags: Optional[List[str]] = None
|
||
|
||
|
||
class GreetingTaskCreate(BaseModel):
|
||
key: str
|
||
name: str
|
||
send_time: str # ISO 格式触发时间,如 2026-03-11T14:30:00,必须为未来时间
|
||
customer_tags: Optional[List[str]] = None
|
||
template: str
|
||
use_qwen: Optional[bool] = False
|
||
|
||
|
||
class ProductTagCreate(BaseModel):
|
||
key: str
|
||
name: str
|
||
|
||
|
||
class PushGroupCreate(BaseModel):
|
||
key: str
|
||
name: str
|
||
customer_ids: Optional[List[str]] = None
|
||
tag_ids: Optional[List[str]] = None
|
||
|
||
|
||
class PushTaskCreate(BaseModel):
|
||
key: str
|
||
product_tag_id: str
|
||
group_id: str
|
||
content: str
|
||
send_at: Optional[str] = None
|
||
|
||
|
||
class SendMessageBody(BaseModel):
|
||
key: str
|
||
to_user_name: str
|
||
content: str
|
||
|
||
|
||
class BatchSendItem(BaseModel):
|
||
to_user_name: str
|
||
content: str
|
||
|
||
|
||
class BatchSendBody(BaseModel):
|
||
key: str
|
||
items: List[BatchSendItem]
|
||
|
||
|
||
class SendImageBody(BaseModel):
|
||
key: str
|
||
to_user_name: str
|
||
image_content: str # 图片 base64 或 URL,依上游约定
|
||
text_content: Optional[str] = ""
|
||
at_wxid_list: Optional[List[str]] = None
|
||
|
||
|
||
class QwenGenerateBody(BaseModel):
|
||
prompt: str
|
||
system: Optional[str] = None
|
||
|
||
|
||
@app.get("/api/customers")
|
||
async def api_list_customers(key: str = Query(..., description="账号 key")):
|
||
return {"items": store.list_customers(key)}
|
||
|
||
|
||
@app.post("/api/customers")
|
||
async def api_upsert_customer(body: CustomerCreate):
|
||
row = store.upsert_customer(
|
||
body.key, body.wxid,
|
||
remark_name=body.remark_name or "",
|
||
region=body.region or "",
|
||
age=body.age or "",
|
||
gender=body.gender or "",
|
||
level=body.level or "",
|
||
tags=body.tags,
|
||
)
|
||
return row
|
||
|
||
|
||
@app.get("/api/customers/{customer_id}")
|
||
async def api_get_customer(customer_id: str):
|
||
row = store.get_customer(customer_id)
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="customer not found")
|
||
return row
|
||
|
||
|
||
@app.get("/api/customer-tags")
|
||
async def api_list_customer_tags(key: str = Query(..., description="账号 key")):
|
||
"""返回该 key 下客户档案中出现的所有标签,供定时任务等下拉选择。"""
|
||
return {"tags": store.list_customer_tags(key)}
|
||
|
||
|
||
@app.delete("/api/customers/{customer_id}")
|
||
async def api_delete_customer(customer_id: str):
|
||
if not store.delete_customer(customer_id):
|
||
raise HTTPException(status_code=404, detail="customer not found")
|
||
return {"ok": True}
|
||
|
||
|
||
@app.get("/api/greeting-tasks")
|
||
async def api_list_greeting_tasks(key: str = Query(..., description="账号 key")):
|
||
return {"items": store.list_greeting_tasks(key)}
|
||
|
||
|
||
def _parse_send_time(s: str) -> Optional[datetime]:
|
||
"""解析 ISO 时间字符串,返回 datetime(无时区)。"""
|
||
try:
|
||
if "T" in s:
|
||
return datetime.fromisoformat(s.replace("Z", "+00:00")[:19])
|
||
return datetime.strptime(s[:19], "%Y-%m-%d %H:%M:%S")
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
@app.post("/api/greeting-tasks")
|
||
async def api_create_greeting_task(body: GreetingTaskCreate):
|
||
dt = _parse_send_time(body.send_time)
|
||
if not dt:
|
||
raise HTTPException(status_code=400, detail="触发时间格式无效,请使用 日期+时分秒 选择器")
|
||
if dt <= datetime.now():
|
||
raise HTTPException(status_code=400, detail="触发时间必须是未来时间,请重新选择")
|
||
row = store.create_greeting_task(
|
||
body.key, body.name, body.send_time,
|
||
customer_tags=body.customer_tags or [],
|
||
template=body.template,
|
||
use_qwen=body.use_qwen or False,
|
||
)
|
||
return row
|
||
|
||
|
||
@app.patch("/api/greeting-tasks/{task_id}")
|
||
async def api_update_greeting_task(task_id: str, body: dict):
|
||
if "send_time" in body:
|
||
dt = _parse_send_time(body["send_time"])
|
||
if not dt:
|
||
raise HTTPException(status_code=400, detail="触发时间格式无效")
|
||
if dt <= datetime.now():
|
||
raise HTTPException(status_code=400, detail="触发时间必须是未来时间")
|
||
row = store.update_greeting_task(task_id, **{k: v for k, v in body.items() if k in ("name", "send_time", "customer_tags", "template", "use_qwen", "enabled")})
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="task not found")
|
||
return row
|
||
|
||
|
||
@app.delete("/api/greeting-tasks/{task_id}")
|
||
async def api_delete_greeting_task(task_id: str):
|
||
if not store.delete_greeting_task(task_id):
|
||
raise HTTPException(status_code=404, detail="task not found")
|
||
return {"ok": True}
|
||
|
||
|
||
@app.get("/api/product-tags")
|
||
async def api_list_product_tags(key: str = Query(..., description="账号 key")):
|
||
return {"items": store.list_product_tags(key)}
|
||
|
||
|
||
@app.post("/api/product-tags")
|
||
async def api_create_product_tag(body: ProductTagCreate):
|
||
return store.create_product_tag(body.key, body.name)
|
||
|
||
|
||
@app.delete("/api/product-tags/{tag_id}")
|
||
async def api_delete_product_tag(tag_id: str):
|
||
if not store.delete_product_tag(tag_id):
|
||
raise HTTPException(status_code=404, detail="tag not found")
|
||
return {"ok": True}
|
||
|
||
|
||
@app.get("/api/push-groups")
|
||
async def api_list_push_groups(key: str = Query(..., description="账号 key")):
|
||
return {"items": store.list_push_groups(key)}
|
||
|
||
|
||
@app.post("/api/push-groups")
|
||
async def api_create_push_group(body: PushGroupCreate):
|
||
return store.create_push_group(body.key, body.name, body.customer_ids or [], body.tag_ids or [])
|
||
|
||
|
||
@app.patch("/api/push-groups/{group_id}")
|
||
async def api_update_push_group(group_id: str, body: dict):
|
||
row = store.update_push_group(
|
||
group_id,
|
||
name=body.get("name"),
|
||
customer_ids=body.get("customer_ids"),
|
||
tag_ids=body.get("tag_ids"),
|
||
)
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="group not found")
|
||
return row
|
||
|
||
|
||
@app.delete("/api/push-groups/{group_id}")
|
||
async def api_delete_push_group(group_id: str):
|
||
if not store.delete_push_group(group_id):
|
||
raise HTTPException(status_code=404, detail="group not found")
|
||
return {"ok": True}
|
||
|
||
|
||
@app.get("/api/push-tasks")
|
||
async def api_list_push_tasks(key: str = Query(..., description="账号 key"), limit: int = Query(100, le=500)):
|
||
return {"items": store.list_push_tasks(key, limit=limit)}
|
||
|
||
|
||
@app.post("/api/push-tasks")
|
||
async def api_create_push_task(body: PushTaskCreate):
|
||
return store.create_push_task(body.key, body.product_tag_id, body.group_id, body.content, body.send_at)
|
||
|
||
|
||
@app.get("/api/messages")
|
||
async def api_list_messages(key: str = Query(..., description="账号 key"), limit: int = Query(100, le=500)):
|
||
"""拉取同步消息列表,并依联系人(GetContactDetailsList)的昵称将 wxid 换算为展示名(FromDisplayName/ToDisplayName),不使用客户档案备注。"""
|
||
items = store.list_sync_messages(key, limit=limit)
|
||
try:
|
||
contact_index = await _build_contact_index(key)
|
||
except Exception:
|
||
contact_index = {}
|
||
for m in items:
|
||
from_wxid = (m.get("FromUserName") or m.get("from") or "").strip()
|
||
to_wxid = (m.get("ToUserName") or m.get("to") or "").strip()
|
||
from_info = contact_index.get(from_wxid) if isinstance(contact_index.get(from_wxid), dict) else None
|
||
to_info = contact_index.get(to_wxid) if isinstance(contact_index.get(to_wxid), dict) else None
|
||
m["FromDisplayName"] = (from_info.get("nick_name") or from_wxid).strip() if from_info else from_wxid
|
||
m["ToDisplayName"] = (to_info.get("nick_name") or to_wxid).strip() if to_info else to_wxid
|
||
return {"items": items}
|
||
|
||
|
||
@app.get("/api/callback-status")
|
||
async def api_callback_status(key: Optional[str] = Query(None, description="账号 key,传入时会向 7006 重新注册 SetCallback 并返回是否成功")):
|
||
"""检查消息回调配置:是否配置了 CALLBACK_BASE_URL、回调地址,以及(传入 key 时)向 7006 注册是否成功。"""
|
||
callback_url = ""
|
||
if CALLBACK_BASE_URL:
|
||
callback_url = f"{CALLBACK_BASE_URL.rstrip('/')}/api/callback/wechat-message"
|
||
registered: Optional[bool] = None
|
||
if key and key.strip():
|
||
k = key.strip()
|
||
registered = await _register_message_callback(k)
|
||
return {
|
||
"configured": bool(CALLBACK_BASE_URL),
|
||
"callback_url": callback_url or None,
|
||
"registered": registered,
|
||
}
|
||
|
||
|
||
@app.post("/api/callback/wechat-message")
|
||
async def api_callback_wechat_message(request: Request, key: Optional[str] = Query(None, description="账号 key(7006 回调时可能带在 query)")):
|
||
"""7006 消息实时回调入口:与 SetCallback 配合,收到新消息时 7006 POST 到此地址,与 WS GetSyncMsg 同结构,统一走 _on_ws_message 处理。"""
|
||
try:
|
||
body = await request.json()
|
||
except Exception:
|
||
body = {}
|
||
# 打印回调原始内容,便于排查(截断避免日志过大)
|
||
try:
|
||
logger.info("callback/wechat-message raw body: %s", str(body)[:1000])
|
||
except Exception:
|
||
pass
|
||
k = (key or (body.get("key") or body.get("Key") or "") or "").strip()
|
||
if not k:
|
||
logger.warning("callback/wechat-message: missing key in query and body")
|
||
return JSONResponse(content={"ok": False, "error": "missing key"}, status_code=200)
|
||
# 原始 body 落库,便于回溯与统计
|
||
try:
|
||
store.append_callback_log(k, body if isinstance(body, dict) else {"raw": str(body)})
|
||
except Exception as le:
|
||
logger.warning("callback_log append failed: %s", le)
|
||
try:
|
||
payload: Any = body
|
||
# 7006 回调格式:{"key": "...", "message": {...}|[...], "type": "message"} 或 {"key": "...", "Data": {...}}
|
||
if isinstance(body, dict) and body.get("message") is not None:
|
||
msg = body.get("message")
|
||
if isinstance(msg, list):
|
||
# message 为数组时逐条归一化
|
||
normalized_list = []
|
||
for m in msg:
|
||
if isinstance(m, dict):
|
||
n = _normalize_callback_message({"message": m})
|
||
if n:
|
||
normalized_list.append(n)
|
||
if normalized_list:
|
||
payload = normalized_list
|
||
else:
|
||
normalized = _normalize_callback_message(body)
|
||
if normalized:
|
||
payload = [normalized]
|
||
elif isinstance(body, dict):
|
||
inner = body.get("Data") or body.get("data")
|
||
if isinstance(inner, dict) and (inner.get("from_user_name") is not None or inner.get("FromUserName") is not None):
|
||
# Data/data 为单条消息结构时也归一化,保证 FromUserName/Content 等字段统一
|
||
normalized = _normalize_callback_message({"message": inner})
|
||
if normalized:
|
||
payload = [normalized]
|
||
elif isinstance(inner, list):
|
||
payload = inner
|
||
_on_ws_message(k, payload)
|
||
logger.info("callback message saved to sync_messages, key=%s", k[:8] + "..." if len(k) > 8 else k)
|
||
except Exception as e:
|
||
logger.exception("callback/wechat-message key=%s: %s", k[-4:] if len(k) >= 4 else "****", e)
|
||
return {"ok": True}
|
||
|
||
|
||
async def _send_message_upstream(key: str, to_user_name: str, content: str) -> dict:
|
||
"""调用上游发送文本消息;成功时写入发出记录并返回响应,失败抛 HTTPException。"""
|
||
url = f"{WECHAT_UPSTREAM_BASE_URL.rstrip('/')}{SEND_MSG_PATH}"
|
||
payload = {"MsgItem": [{"ToUserName": to_user_name, "MsgType": 1, "TextContent": content}]}
|
||
async with httpx.AsyncClient(trust_env=False, timeout=15.0) as client:
|
||
resp = await client.post(url, params={"key": key}, json=payload)
|
||
if resp.status_code >= 400:
|
||
body_preview = resp.text[:400] if resp.text else ""
|
||
logger.warning("Send message upstream %s: %s", resp.status_code, body_preview)
|
||
raise HTTPException(
|
||
status_code=502,
|
||
detail=f"upstream_returned_{resp.status_code}: {body_preview}",
|
||
)
|
||
store.append_sent_message(key, to_user_name, content)
|
||
try:
|
||
return resp.json()
|
||
except Exception:
|
||
return {"ok": True, "raw": resp.text[:500]}
|
||
|
||
|
||
async def _send_batch_upstream(key: str, items: List[dict]) -> dict:
|
||
"""批量发送:一次请求多个 MsgItem,快速分发。"""
|
||
url = f"{WECHAT_UPSTREAM_BASE_URL.rstrip('/')}{SEND_MSG_PATH}"
|
||
msg_items = []
|
||
for it in items:
|
||
to_user = (it.get("to_user_name") or it.get("ToUserName") or "").strip()
|
||
content = (it.get("content") or it.get("TextContent") or "").strip()
|
||
if not to_user:
|
||
continue
|
||
msg_items.append({"ToUserName": to_user, "MsgType": 1, "TextContent": content})
|
||
if not msg_items:
|
||
raise HTTPException(status_code=400, detail="items 中至少需要一条有效 to_user_name 与 content")
|
||
payload = {"MsgItem": msg_items}
|
||
async with httpx.AsyncClient(trust_env=False, timeout=30.0) as client:
|
||
resp = await client.post(url, params={"key": key}, json=payload)
|
||
if resp.status_code >= 400:
|
||
body_preview = resp.text[:400] if resp.text else ""
|
||
logger.warning("Batch send upstream %s: %s", resp.status_code, body_preview)
|
||
raise HTTPException(
|
||
status_code=502,
|
||
detail=f"upstream_returned_{resp.status_code}: {body_preview}",
|
||
)
|
||
for it in msg_items:
|
||
store.append_sent_message(key, it["ToUserName"], it.get("TextContent", ""))
|
||
try:
|
||
return resp.json()
|
||
except Exception:
|
||
return {"ok": True, "sent": len(msg_items), "raw": resp.text[:500]}
|
||
|
||
|
||
async def _send_image_upstream(key: str, to_user_name: str, image_content: str,
|
||
text_content: Optional[str] = "",
|
||
at_wxid_list: Optional[List[str]] = None) -> dict:
|
||
"""发送图片消息:走 7006 /message/SendImageMessage,MsgItem 含 ImageContent、MsgType=0、TextContent、AtWxIDList。"""
|
||
url = f"{SEND_IMAGE_UPSTREAM_BASE_URL.rstrip('/')}{SEND_IMAGE_PATH}"
|
||
item = {
|
||
"AtWxIDList": list(at_wxid_list) if at_wxid_list else [],
|
||
"ImageContent": image_content or "",
|
||
"MsgType": IMAGE_MSG_TYPE,
|
||
"TextContent": text_content or "",
|
||
"ToUserName": to_user_name,
|
||
}
|
||
payload = {"MsgItem": [item]}
|
||
async with httpx.AsyncClient(trust_env=False, timeout=15.0) as client:
|
||
resp = await client.post(url, params={"key": key}, json=payload)
|
||
if resp.status_code >= 400:
|
||
body_preview = resp.text[:400] if resp.text else ""
|
||
logger.warning("Send image upstream %s: %s", resp.status_code, body_preview)
|
||
raise HTTPException(
|
||
status_code=502,
|
||
detail=f"upstream_returned_{resp.status_code}: {body_preview}",
|
||
)
|
||
store.append_sent_message(key, to_user_name, "[图片]" + ((" " + text_content) if text_content else ""))
|
||
try:
|
||
return resp.json()
|
||
except Exception:
|
||
return {"ok": True, "raw": resp.text[:500]}
|
||
|
||
|
||
@app.post("/api/send-message")
|
||
async def api_send_message(body: SendMessageBody):
|
||
try:
|
||
return await _send_message_upstream(body.key, body.to_user_name, body.content)
|
||
except HTTPException:
|
||
raise
|
||
except Exception as exc:
|
||
logger.exception("Send message upstream error: %s", exc)
|
||
raise HTTPException(status_code=502, detail=f"upstream_error: {exc}") from exc
|
||
|
||
|
||
@app.post("/api/send-batch")
|
||
async def api_send_batch(body: BatchSendBody):
|
||
"""快速群发:一次请求批量发送给多人,支持从好友/客户列表选择后调用。"""
|
||
items = [{"to_user_name": it.to_user_name, "content": it.content} for it in body.items]
|
||
try:
|
||
return await _send_batch_upstream(body.key, items)
|
||
except HTTPException:
|
||
raise
|
||
except Exception as exc:
|
||
logger.exception("Batch send error: %s", exc)
|
||
raise HTTPException(status_code=502, detail=f"upstream_error: {exc}") from exc
|
||
|
||
|
||
@app.post("/api/send-image")
|
||
async def api_send_image(body: SendImageBody):
|
||
"""发送图片消息快捷方式,参数对应 MsgItem:ImageContent、TextContent、ToUserName、AtWxIDList。"""
|
||
try:
|
||
return await _send_image_upstream(
|
||
body.key,
|
||
body.to_user_name,
|
||
body.image_content,
|
||
text_content=body.text_content or "",
|
||
at_wxid_list=body.at_wxid_list,
|
||
)
|
||
except HTTPException:
|
||
raise
|
||
except Exception as exc:
|
||
logger.exception("Send image error: %s", exc)
|
||
raise HTTPException(status_code=502, detail=f"upstream_error: {exc}") from exc
|
||
|
||
|
||
def _log_contact_list_response_structure(raw: dict) -> None:
|
||
"""首轮无数据时打印上游响应结构,便于排查为何数据未解析到。"""
|
||
keys_top = list(raw.keys()) if isinstance(raw, dict) else []
|
||
data = raw.get("Data") or raw.get("data")
|
||
keys_data = list(data.keys()) if isinstance(data, dict) else (type(data).__name__ if data is not None else "None")
|
||
logger.info(
|
||
"GetContactList response structure (no items extracted): top_level_keys=%s, Data_keys=%s",
|
||
keys_top,
|
||
keys_data,
|
||
)
|
||
if isinstance(data, dict):
|
||
for k, v in list(data.items())[:5]:
|
||
preview = str(v)[:80] if v is not None else "null"
|
||
logger.info(" Data.%s: %s", k, preview)
|
||
# 7006 常见为 Data.ContactList 对象,内挂 contactUsernameList 数组
|
||
cl = data.get("ContactList") or data.get("contactList")
|
||
if isinstance(cl, dict):
|
||
cl_keys = list(cl.keys())
|
||
logger.info(" Data.ContactList keys: %s", cl_keys)
|
||
for uk in ("contactUsernameList", "ContactUsernameList", "UserNameList", "userNameList", "usernameList"):
|
||
arr = cl.get(uk)
|
||
if isinstance(arr, list):
|
||
logger.info(" Data.ContactList.%s length=%s, sample=%s", uk, len(arr), arr[:3] if arr else [])
|
||
break
|
||
|
||
|
||
def _unwrap_wechat_field(v: Any) -> Any:
|
||
"""上游字段有时为 {'str': 'xxx'} 或 {'len': 0} 这种包装,这里尝试取出内部值。"""
|
||
if isinstance(v, dict):
|
||
if "str" in v:
|
||
return v.get("str")
|
||
if "len" in v and len(v) == 1:
|
||
return v.get("len")
|
||
return v
|
||
|
||
|
||
def _normalize_callback_message(raw: dict) -> dict:
|
||
"""
|
||
将 7006 回调的 message 结构统一为与 WS GetSyncMsg 类似的消息字典,
|
||
便于复用 _on_ws_message / 实时消息面板的展示与 AI 接管逻辑。
|
||
|
||
示例 raw:
|
||
{
|
||
"key": "HBpEnbtj9BJZ",
|
||
"message": {
|
||
"msg_id": 126545176,
|
||
"from_user_name": {"str": "zhang499142409"},
|
||
"to_user_name": {"str": "wxid_xxx"},
|
||
"msg_type": 1,
|
||
"content": {"str": "测试"},
|
||
...
|
||
},
|
||
"type": "message"
|
||
}
|
||
"""
|
||
msg = raw.get("message") or raw
|
||
if not isinstance(msg, dict):
|
||
return {}
|
||
from_user = _unwrap_wechat_field(msg.get("from_user_name") or msg.get("FromUserName"))
|
||
to_user = _unwrap_wechat_field(msg.get("to_user_name") or msg.get("ToUserName"))
|
||
content = _unwrap_wechat_field(msg.get("content") or msg.get("Content"))
|
||
msg_type = msg.get("msg_type") or msg.get("MsgType")
|
||
create_time = msg.get("create_time") or msg.get("CreateTime")
|
||
# 回放到统一结构,字段名尽量与 WS GetSyncMsg 一致
|
||
normalized = {
|
||
"MsgId": msg.get("msg_id") or msg.get("MsgId") or msg.get("new_msg_id"),
|
||
"FromUserName": from_user or "",
|
||
"ToUserName": to_user or "",
|
||
"Content": content or "",
|
||
"MsgType": msg_type,
|
||
"CreateTime": create_time,
|
||
}
|
||
# 附带原始字段,便于调试 / 扩展展示
|
||
for k, v in msg.items():
|
||
if k in (
|
||
"msg_id",
|
||
"MsgId",
|
||
"new_msg_id",
|
||
"from_user_name",
|
||
"FromUserName",
|
||
"to_user_name",
|
||
"ToUserName",
|
||
"content",
|
||
"Content",
|
||
"msg_type",
|
||
"MsgType",
|
||
"create_time",
|
||
"CreateTime",
|
||
):
|
||
continue
|
||
normalized[k] = v
|
||
return normalized
|
||
|
||
|
||
def _normalize_contact_list(raw: Any) -> List[dict]:
|
||
"""将上游 GetContactList 多种返回格式统一为 [ { wxid, remark_name, ... } ]。"""
|
||
items: Any = []
|
||
if isinstance(raw, list):
|
||
items = raw
|
||
elif isinstance(raw, dict):
|
||
data = raw.get("Data") or raw.get("data") or raw
|
||
if isinstance(data, list):
|
||
items = data
|
||
elif isinstance(data, dict):
|
||
contact_list = (
|
||
data.get("ContactList")
|
||
or data.get("contactList")
|
||
or data.get("WxcontactList")
|
||
or data.get("wxcontactList")
|
||
or data.get("CachedContactList")
|
||
)
|
||
# 7006 格式:ContactList 为对象,联系人 id 在 contactUsernameList 等数组里
|
||
if isinstance(contact_list, dict):
|
||
username_list = (
|
||
contact_list.get("contactUsernameList")
|
||
or contact_list.get("ContactUsernameList")
|
||
or contact_list.get("UserNameList")
|
||
or contact_list.get("userNameList")
|
||
or contact_list.get("usernameList")
|
||
or []
|
||
)
|
||
if isinstance(username_list, list) and username_list:
|
||
items = [{"wxid": (x if isinstance(x, str) else str(x)), "remark_name": (x if isinstance(x, str) else str(x))} for x in username_list]
|
||
else:
|
||
items = []
|
||
else:
|
||
items = contact_list if isinstance(contact_list, list) else (
|
||
data.get("List") or data.get("list") or data.get("items")
|
||
or data.get("Friends") or data.get("friends")
|
||
or data.get("MemberList") or data.get("memberList") or []
|
||
)
|
||
items = items or raw.get("items") or raw.get("list") or raw.get("List") or raw.get("ContactList") or raw.get("WxcontactList") or []
|
||
result = []
|
||
for x in items:
|
||
if isinstance(x, str):
|
||
result.append({"wxid": x, "remark_name": x})
|
||
continue
|
||
if not isinstance(x, dict):
|
||
continue
|
||
wxid = _unwrap_wechat_field(
|
||
x.get("wxid")
|
||
or x.get("Wxid")
|
||
or x.get("UserName")
|
||
or x.get("userName")
|
||
or x.get("Alias")
|
||
) or ""
|
||
remark = _unwrap_wechat_field(
|
||
x.get("remark_name")
|
||
or x.get("RemarkName")
|
||
or x.get("NickName")
|
||
or x.get("nickName")
|
||
or x.get("DisplayName")
|
||
) or wxid
|
||
result.append({"wxid": str(wxid).strip(), "remark_name": str(remark).strip()})
|
||
return result
|
||
|
||
|
||
async def _fetch_all_contact_usernames(key: str) -> List[str]:
|
||
"""
|
||
调用 /friend/GetContactList 拉取全部联系人,返回去重后的 UserName 列表。
|
||
仅用于后续调用 GetContactDetailsList 获取详情。
|
||
"""
|
||
url = f"{CHECK_STATUS_BASE_URL.rstrip('/')}/friend/GetContactList"
|
||
usernames: List[str] = []
|
||
seen: set = set()
|
||
body: dict = {"CurrentChatRoomContactSeq": 0, "CurrentWxcontactSeq": 0}
|
||
max_rounds = 50
|
||
try:
|
||
async with httpx.AsyncClient(trust_env=False, timeout=30.0) as client:
|
||
for round_num in range(max_rounds):
|
||
resp = await client.post(url, params={"key": key}, json=body)
|
||
if resp.status_code >= 400:
|
||
logger.warning("GetContactList(round=%s) %s: %s", round_num + 1, resp.status_code, resp.text[:200])
|
||
break
|
||
raw = resp.json()
|
||
chunk = _normalize_contact_list(raw)
|
||
if not chunk and isinstance(raw, dict):
|
||
chunk = _normalize_contact_list(raw.get("Data") or raw.get("data") or raw)
|
||
# 首轮无归一化结果时,直接从 Data.ContactList 下任意已知数组键取 id 列表(7006 格式)
|
||
if not chunk and round_num == 0 and isinstance(raw, dict):
|
||
data = raw.get("Data") or raw.get("data") or {}
|
||
if isinstance(data, dict):
|
||
cl = data.get("ContactList") or data.get("contactList")
|
||
if isinstance(cl, dict):
|
||
ul = (
|
||
cl.get("contactUsernameList")
|
||
or cl.get("ContactUsernameList")
|
||
or cl.get("UserNameList")
|
||
or cl.get("userNameList")
|
||
or cl.get("usernameList")
|
||
or []
|
||
)
|
||
if isinstance(ul, list) and ul:
|
||
chunk = [{"wxid": (x if isinstance(x, str) else str(x)), "remark_name": ""} for x in ul]
|
||
logger.info("GetContactList fallback from Data.ContactList.* list, count=%s", len(chunk))
|
||
if not chunk:
|
||
_log_contact_list_response_structure(raw)
|
||
for item in chunk or []:
|
||
wxid = (item.get("wxid") or "").strip()
|
||
if wxid and wxid not in seen:
|
||
seen.add(wxid)
|
||
usernames.append(wxid)
|
||
next_chat, next_wx = _next_contact_seq(raw)
|
||
if next_chat == 0 and next_wx == 0:
|
||
break
|
||
if next_chat == body.get("CurrentChatRoomContactSeq") and next_wx == body.get("CurrentWxcontactSeq"):
|
||
break
|
||
body = {
|
||
"CurrentChatRoomContactSeq": next_chat or body.get("CurrentChatRoomContactSeq", 0),
|
||
"CurrentWxcontactSeq": next_wx or body.get("CurrentWxcontactSeq", 0),
|
||
}
|
||
if not body["CurrentChatRoomContactSeq"] and not body["CurrentWxcontactSeq"]:
|
||
break
|
||
except Exception as e:
|
||
logger.warning("GetContactList usernames error: %s", e)
|
||
logger.info("GetContactList usernames total=%s", len(usernames))
|
||
return usernames
|
||
|
||
|
||
async def _build_contact_index(key: str, force_refresh: bool = False) -> Dict[str, dict]:
|
||
"""
|
||
通用联系人索引:
|
||
- 先通过 GetContactList 拿到全部 UserName 列表;
|
||
- 再通过 /friend/GetContactDetailsList 批量拉取详情;
|
||
- 构建 name(微信号/昵称/备注) -> 联系人详情 的索引。
|
||
force_refresh=True 时跳过内存缓存,重新请求上游。
|
||
"""
|
||
if not force_refresh and key in _contact_index and _contact_index[key]:
|
||
return _contact_index[key]
|
||
|
||
usernames = await _fetch_all_contact_usernames(key)
|
||
if not usernames:
|
||
_contact_index[key] = {}
|
||
return _contact_index[key]
|
||
|
||
url = f"{CHECK_STATUS_BASE_URL.rstrip('/')}/friend/GetContactDetailsList"
|
||
index: Dict[str, dict] = {}
|
||
# 小批量遍历,多请求并发调用,直到全部返回(不把 contactUsernameList 整包当 UserNames 一次传)
|
||
batch_size = 10
|
||
max_concurrent = 6
|
||
sem = asyncio.Semaphore(max_concurrent)
|
||
|
||
async def fetch_one_batch(client: httpx.AsyncClient, batch: List[str], batch_idx: int) -> List[dict]:
|
||
body = {"RoomWxIDList": [], "UserNames": batch}
|
||
async with sem:
|
||
try:
|
||
resp = await client.post(url, params={"key": key}, json=body)
|
||
except Exception as e:
|
||
logger.warning("GetContactDetailsList batch %s error: %s", batch_idx, e)
|
||
return []
|
||
if resp.status_code >= 400:
|
||
logger.warning("GetContactDetailsList batch %s %s: %s", batch_idx, resp.status_code, resp.text[:200])
|
||
return []
|
||
raw = resp.json()
|
||
data = raw.get("Data") or raw.get("data") or raw
|
||
items = []
|
||
if isinstance(data, dict):
|
||
items = (
|
||
data.get("List")
|
||
or data.get("list")
|
||
or data.get("ContactDetailsList")
|
||
or data.get("contacts")
|
||
or data.get("contactList")
|
||
or data.get("ContactList") # 7006 可能用大写
|
||
or []
|
||
)
|
||
elif isinstance(data, list):
|
||
items = data
|
||
if not isinstance(items, list):
|
||
return []
|
||
if not items and batch_idx == 0:
|
||
logger.info("GetContactDetailsList batch 0: data keys=%s, no list parsed", list(data.keys()) if isinstance(data, dict) else type(data).__name__)
|
||
if batch_idx == 0 and items:
|
||
sample = items[0]
|
||
if isinstance(sample, dict):
|
||
logger.info(
|
||
"GetContactDetailsList first batch item keys=%s",
|
||
list(sample.keys()),
|
||
)
|
||
return items
|
||
|
||
async with httpx.AsyncClient(trust_env=False, timeout=30.0) as client:
|
||
batches = [usernames[i : i + batch_size] for i in range(0, len(usernames), batch_size)]
|
||
tasks = [fetch_one_batch(client, b, i) for i, b in enumerate(batches)]
|
||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||
for i, one in enumerate(results):
|
||
if isinstance(one, BaseException):
|
||
logger.warning("GetContactDetailsList batch %s exception: %s", i, one)
|
||
continue
|
||
for d in one or []:
|
||
if not isinstance(d, dict):
|
||
continue
|
||
# 仅当上游明确返回 bitVal 且不为 3 时跳过(未返回或为 3 则保留,避免漏掉联系人)
|
||
try:
|
||
bv = d.get("bitVal")
|
||
if bv is not None and int(bv) != 3:
|
||
continue
|
||
except (TypeError, ValueError):
|
||
pass
|
||
wxid = _unwrap_wechat_field(
|
||
d.get("userName") or d.get("UserName") or d.get("user_name") or d.get("wxid")
|
||
)
|
||
wxid = (wxid or "").strip()
|
||
if not wxid:
|
||
continue
|
||
nick = _unwrap_wechat_field(d.get("nickName") or d.get("NickName") or d.get("nick_name")) or ""
|
||
nick = str(nick).strip()
|
||
remark = _unwrap_wechat_field(
|
||
d.get("remark") or d.get("RemarkName") or d.get("remark_name")
|
||
) or ""
|
||
remark = str(remark).strip()
|
||
pyinitial = _unwrap_wechat_field(d.get("pyinitial") or d.get("pyInitial") or d.get("PYInitial")) or ""
|
||
pyinitial = str(pyinitial).strip()
|
||
quan_pin = _unwrap_wechat_field(d.get("quanPin") or d.get("QuanPin") or d.get("fullPinyin")) or ""
|
||
quan_pin = str(quan_pin).strip()
|
||
info = {
|
||
"wxid": wxid,
|
||
"remark_name": remark or nick or wxid,
|
||
"nick_name": nick,
|
||
"pyinitial": pyinitial,
|
||
"quan_pin": quan_pin,
|
||
"raw": d,
|
||
}
|
||
index[wxid] = info
|
||
if nick and nick not in index:
|
||
index[nick] = info
|
||
if remark and remark not in index:
|
||
index[remark] = info
|
||
|
||
if usernames and not index:
|
||
logger.warning(
|
||
"Contact index empty for key=***%s despite usernames count=%s: GetContactDetailsList may return different structure or all items filtered",
|
||
key[-4:] if len(key) >= 4 else "****",
|
||
len(usernames),
|
||
)
|
||
_contact_index[key] = index
|
||
logger.info("Contact index built for key=***%s, size=%s", key[-4:] if len(key) >= 4 else "****", len(index))
|
||
return index
|
||
|
||
|
||
async def _resolve_contact_username(key: str, name: str) -> Optional[str]:
|
||
"""
|
||
将用户提到的“昵称/备注/微信号”解析成真正的 wxid(UserName)。
|
||
返回 wxid,找不到则返回 None。
|
||
"""
|
||
if not name:
|
||
return None
|
||
idx = await _build_contact_index(key)
|
||
info = idx.get(name.strip())
|
||
if info and isinstance(info, dict):
|
||
wxid = (info.get("wxid") or info.get("UserName") or "").strip()
|
||
return wxid or None
|
||
return None
|
||
|
||
|
||
# 上游 GetContactList:传 0 拉首页,响应可能带 NextWxcontactSeq/NextChatRoomContactSeq 表示还有后续页,需循环拉取全量
|
||
def _next_contact_seq(raw: dict) -> tuple:
|
||
"""从上游响应中解析下一页的 seq,返回 (next_chatroom_seq, next_wxcontact_seq)。无下一页则返回 (0, 0)。"""
|
||
def _int(v, default: int = 0) -> int:
|
||
if v is None:
|
||
return default
|
||
try:
|
||
return int(v)
|
||
except (TypeError, ValueError):
|
||
return default
|
||
|
||
data = raw.get("Data") or raw.get("data") or raw
|
||
chatroom_seq = 0
|
||
wxcontact_seq = 0
|
||
if isinstance(data, dict):
|
||
chatroom_seq = _int(data.get("NextChatRoomContactSeq") or data.get("CurrentChatRoomContactSeq"), 0)
|
||
wxcontact_seq = _int(data.get("NextWxcontactSeq") or data.get("CurrentWxcontactSeq"), 0)
|
||
for k in ("NextChatRoomContactSeq", "NextWxcontactSeq"):
|
||
v = raw.get(k)
|
||
if v is not None:
|
||
if "ChatRoom" in k:
|
||
chatroom_seq = _int(v, 0)
|
||
else:
|
||
wxcontact_seq = _int(v, 0)
|
||
return (chatroom_seq, wxcontact_seq)
|
||
|
||
|
||
# 联系人列表等接口禁止缓存,避免 304 导致前端拿到旧数据
|
||
_NO_CACHE_HEADERS = {"Cache-Control": "no-store, no-cache, must-revalidate", "Pragma": "no-cache"}
|
||
|
||
|
||
@app.get("/api/contact-list")
|
||
async def api_contact_list(
|
||
key: str = Query(..., description="账号 key"),
|
||
refresh: Optional[str] = Query(None, description="传 1/true/yes 时强制重新拉取,不用内存缓存"),
|
||
):
|
||
"""获取全部联系人详情:基于 GetContactList + GetContactDetailsList 构建的通用索引。禁止缓存。"""
|
||
try:
|
||
force_refresh = (refresh or "").lower() in ("1", "true", "yes")
|
||
index = await _build_contact_index(key, force_refresh=force_refresh)
|
||
# 只返回去重后的联系人详情(以 wxid 主键)
|
||
uniques: Dict[str, dict] = {}
|
||
for name, info in index.items():
|
||
if not isinstance(info, dict):
|
||
continue
|
||
wxid = (info.get("wxid") or "").strip()
|
||
if not wxid or wxid in uniques:
|
||
continue
|
||
uniques[wxid] = {
|
||
"wxid": wxid,
|
||
# 显示时优先用昵称,其次备注,最后用 wxid
|
||
"nick_name": info.get("nick_name") or "",
|
||
"remark_name": info.get("remark_name") or info.get("nick_name") or wxid,
|
||
"pyinitial": info.get("pyinitial") or "",
|
||
"quanPin": info.get("quan_pin") or "",
|
||
}
|
||
items = list(uniques.values())
|
||
logger.info("api_contact_list key=***%s -> %s contacts", key[-4:] if len(key) >= 4 else "****", len(items))
|
||
return JSONResponse(content={"items": items}, headers=_NO_CACHE_HEADERS)
|
||
except Exception as e:
|
||
# 打印完整异常与 key,便于排查加载联系人报错
|
||
logger.exception("contact-list error for key=***%s: %s", key[-4:] if len(key) >= 4 else "****", e)
|
||
return JSONResponse(content={"items": [], "error": str(e)}, headers=_NO_CACHE_HEADERS)
|
||
|
||
|
||
@app.get("/api/friends")
|
||
async def api_list_friends(key: str = Query(..., description="账号 key")):
|
||
"""好友列表:代理上游联系人接口,与 /api/contact-list 同源;否则返回客户档案。"""
|
||
return await api_contact_list(key)
|
||
|
||
|
||
def _friends_fallback(key: str) -> List[dict]:
|
||
"""用客户档案作为可选联系人,便于在管理页选择群发对象。"""
|
||
customers = store.list_customers(key)
|
||
return [
|
||
{"wxid": c.get("wxid"), "remark_name": c.get("remark_name") or c.get("wxid"), "id": c.get("id")}
|
||
for c in customers
|
||
if c.get("wxid")
|
||
]
|
||
|
||
|
||
# ---------- AI 接管回复配置(白名单 + 超级管理员) ----------
|
||
class AIReplyConfigUpdate(BaseModel):
|
||
key: str
|
||
super_admin_wxids: Optional[List[str]] = None
|
||
whitelist_wxids: Optional[List[str]] = None
|
||
|
||
|
||
@app.get("/api/ai-reply-config")
|
||
async def api_get_ai_reply_config(key: str = Query(..., description="账号 key")):
|
||
"""获取当前账号的 AI 回复配置:超级管理员与白名单 wxid 列表。"""
|
||
cfg = store.get_ai_reply_config(key)
|
||
if not cfg:
|
||
return {"key": key, "super_admin_wxids": [], "whitelist_wxids": []}
|
||
return cfg
|
||
|
||
|
||
@app.patch("/api/ai-reply-config")
|
||
async def api_update_ai_reply_config(body: AIReplyConfigUpdate):
|
||
"""设置 AI 回复白名单与超级管理员:仅列表内联系人会收到 AI 自动回复。"""
|
||
return store.update_ai_reply_config(
|
||
body.key,
|
||
super_admin_wxids=body.super_admin_wxids,
|
||
whitelist_wxids=body.whitelist_wxids,
|
||
)
|
||
|
||
|
||
@app.get("/api/ai-reply-status")
|
||
async def api_ai_reply_status(key: str = Query(..., description="账号 key")):
|
||
"""检查 AI 模型接管是否正常:WS 连接、是否配置白名单/超级管理员、是否有当前模型。"""
|
||
ws_ok = is_ws_connected()
|
||
cfg = store.get_ai_reply_config(key)
|
||
super_list = cfg.get("super_admin_wxids") or [] if cfg else []
|
||
white_list = cfg.get("whitelist_wxids") or [] if cfg else []
|
||
has_allow_list = bool(super_list or white_list)
|
||
model = store.get_current_model()
|
||
has_model = bool(model and model.get("api_key"))
|
||
ok = ws_ok and has_allow_list and has_model
|
||
return {
|
||
"ok": ok,
|
||
"ws_connected": ws_ok,
|
||
"has_ai_reply_config": bool(cfg),
|
||
"has_whitelist_or_super_admin": has_allow_list,
|
||
"super_admin_count": len(super_list),
|
||
"whitelist_count": len(white_list),
|
||
"has_current_model": has_model,
|
||
"message": "正常" if ok else (
|
||
"未连接消息同步(WS)" if not ws_ok else
|
||
"请在「AI 回复设置」添加并保存超级管理员或白名单" if not has_allow_list else
|
||
"请在「模型管理」添加并选中当前模型" if not has_model else "未知"
|
||
),
|
||
}
|
||
|
||
|
||
# ---------- 模型管理(多模型切换,API Key 按模型配置) ----------
|
||
class ModelCreate(BaseModel):
|
||
name: str
|
||
provider: str # qwen | openai
|
||
api_key: str
|
||
base_url: Optional[str] = ""
|
||
model_name: Optional[str] = ""
|
||
is_current: Optional[bool] = False
|
||
|
||
|
||
class ModelUpdate(BaseModel):
|
||
name: Optional[str] = None
|
||
api_key: Optional[str] = None
|
||
base_url: Optional[str] = None
|
||
model_name: Optional[str] = None
|
||
|
||
|
||
def _mask_api_key(m: dict) -> dict:
|
||
if not m or not isinstance(m, dict):
|
||
return m
|
||
out = dict(m)
|
||
if out.get("api_key"):
|
||
out["api_key"] = "***"
|
||
return out
|
||
|
||
|
||
@app.get("/api/models")
|
||
async def api_list_models():
|
||
return {"items": [_mask_api_key(m) for m in store.list_models()]}
|
||
|
||
|
||
@app.get("/api/models/current")
|
||
async def api_get_current_model():
|
||
m = store.get_current_model()
|
||
if not m:
|
||
return {"current": None}
|
||
return {"current": _mask_api_key(m)}
|
||
|
||
|
||
@app.post("/api/models")
|
||
async def api_create_model(body: ModelCreate):
|
||
if body.provider not in ("qwen", "openai", "doubao"):
|
||
raise HTTPException(status_code=400, detail="provider must be qwen, openai or doubao")
|
||
row = store.create_model(
|
||
name=body.name,
|
||
provider=body.provider,
|
||
api_key=body.api_key,
|
||
base_url=body.base_url or "",
|
||
model_name=body.model_name or "",
|
||
is_current=body.is_current or False,
|
||
)
|
||
return _mask_api_key(row)
|
||
|
||
|
||
@app.patch("/api/models/{model_id}")
|
||
async def api_update_model(model_id: str, body: ModelUpdate):
|
||
row = store.update_model(
|
||
model_id,
|
||
name=body.name,
|
||
api_key=body.api_key,
|
||
base_url=body.base_url,
|
||
model_name=body.model_name,
|
||
)
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="model not found")
|
||
return _mask_api_key(row)
|
||
|
||
|
||
@app.post("/api/models/{model_id}/set-current")
|
||
async def api_set_current_model(model_id: str):
|
||
row = store.set_current_model(model_id)
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="model not found")
|
||
return _mask_api_key(row)
|
||
|
||
|
||
@app.delete("/api/models/{model_id}")
|
||
async def api_delete_model(model_id: str):
|
||
if not store.delete_model(model_id):
|
||
raise HTTPException(status_code=404, detail="model not found")
|
||
return {"ok": True}
|
||
|
||
|
||
@app.post("/api/qwen/generate")
|
||
async def api_qwen_generate(body: QwenGenerateBody):
|
||
"""所有对话生成由当前选中的模型接管,不再使用环境变量兜底。"""
|
||
messages = []
|
||
if body.system:
|
||
messages.append({"role": "system", "content": body.system})
|
||
messages.append({"role": "user", "content": body.prompt})
|
||
text = await llm_chat(messages)
|
||
if text is None:
|
||
raise HTTPException(status_code=503, detail="请在「模型管理」页添加并选中模型、填写 API Key")
|
||
return {"text": text}
|
||
|
||
|
||
@app.post("/api/qwen/generate-greeting")
|
||
async def api_qwen_generate_greeting(
|
||
remark_name: str = Query(...),
|
||
region: str = Query(""),
|
||
tags: Optional[str] = Query(None),
|
||
):
|
||
"""问候语生成由当前选中的模型接管。"""
|
||
tag_list = [t.strip() for t in (tags or "").split(",") if t.strip()]
|
||
user = f"请生成一句简短的微信问候语(1-2句话),客户备注名:{remark_name}"
|
||
if region:
|
||
user += f",地区:{region}"
|
||
if tag_list:
|
||
user += f",标签:{','.join(tag_list)}"
|
||
user += "。不要解释,只输出问候语本身。"
|
||
text = await llm_chat([{"role": "user", "content": user}])
|
||
if text is None:
|
||
raise HTTPException(status_code=503, detail="请在「模型管理」页添加并选中模型、填写 API Key")
|
||
return {"text": text}
|
||
|
||
|
||
class LogoutBody(BaseModel):
|
||
key: str
|
||
|
||
|
||
@app.post("/auth/logout")
|
||
async def logout(body: LogoutBody):
|
||
key = body.key
|
||
if not key:
|
||
raise HTTPException(status_code=400, detail="key is required")
|
||
|
||
url = f"{WECHAT_UPSTREAM_BASE_URL}/login/LogOut"
|
||
logger.info("LogOut: key=%s, url=%s", key, url)
|
||
try:
|
||
async with httpx.AsyncClient(trust_env=False, timeout=15.0) as client:
|
||
resp = await client.get(url, params={"key": key})
|
||
except Exception as exc:
|
||
logger.exception("Error calling upstream LogOut: %s", exc)
|
||
raise HTTPException(status_code=502, detail=f"upstream_error: {exc}") from exc
|
||
|
||
body_text = resp.text[:500]
|
||
logger.info(
|
||
"Upstream LogOut response: status=%s, body=%s",
|
||
resp.status_code,
|
||
body_text,
|
||
)
|
||
return resp.json()
|
||
|
||
|
||
# 静态页面目录:与 Node 一致,直接访问后端时也可访问所有静态页
|
||
_PUBLIC_DIR = os.path.join(os.path.dirname(__file__), "..", "public")
|
||
if os.path.isdir(_PUBLIC_DIR):
|
||
app.mount("/", StaticFiles(directory=_PUBLIC_DIR, html=True), name="static")
|
||
|