Files
wechatAiclaw/backend/main.py
2026-03-15 19:17:12 +08:00

2193 lines
91 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import asyncio
import html
import logging
import os
from contextlib import asynccontextmanager
# 优先加载项目根目录的 .env不依赖当前工作目录使 HTTP_PROXY/HTTPS_PROXY 等生效
try:
from dotenv import load_dotenv
_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
load_dotenv(os.path.join(_root, ".env"))
except ImportError:
pass
from logging.handlers import RotatingFileHandler
from datetime import datetime
from typing import Any, Dict, List, Optional
from urllib.parse import urlencode
import httpx
from fastapi import FastAPI, HTTPException, Query, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import HTMLResponse, JSONResponse, Response
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel
try:
from backend import store
from backend.llm_client import chat as llm_chat
from backend.ws_sync import is_ws_connected, set_message_callback, start_ws_sync
except ImportError:
import store
from llm_client import chat as llm_chat
from ws_sync import is_ws_connected, set_message_callback, start_ws_sync
WECHAT_UPSTREAM_BASE_URL = os.getenv("WECHAT_UPSTREAM_BASE_URL", "http://localhost:8080").rstrip("/")
CHECK_STATUS_BASE_URL = os.getenv("CHECK_STATUS_BASE_URL", "http://113.44.162.180:7006").rstrip("/")
# 消息实时回调:设置后 7006 将新消息 POST 到该地址,作为主接收入口(与 SetCallback 一致)
CALLBACK_BASE_URL = (os.getenv("CALLBACK_BASE_URL") or "").strip().rstrip("/")
SLIDER_VERIFY_BASE_URL = os.getenv("SLIDER_VERIFY_BASE_URL", "http://113.44.162.180:7765").rstrip("/")
# 滑块服务 7765 的 key与账号 key 无关,默认使用提供方 QQ使用其公共服务时必填
SLIDER_VERIFY_KEY = os.getenv("SLIDER_VERIFY_KEY", "408449830")
# 发送文本消息swagger 中为 POST /message/SendTextMessagebody 为 SendMessageModelMsgItem 数组)
SEND_MSG_PATH = (os.getenv("SEND_MSG_PATH") or "/message/SendTextMessage").strip()
# 发送图片消息7006 为 /message/SendImageMessagebody MsgItem 含 ImageContent、MsgType=0
SEND_IMAGE_UPSTREAM_BASE_URL = (os.getenv("SEND_IMAGE_UPSTREAM_BASE_URL") or "").strip() or CHECK_STATUS_BASE_URL
SEND_IMAGE_PATH = (os.getenv("SEND_IMAGE_PATH") or "").strip() or "/message/SendImageMessage"
# 图片消息 MsgType7006 SendImageMessage 为 0
IMAGE_MSG_TYPE = int(os.getenv("IMAGE_MSG_TYPE", "0"))
# 按 key 缓存取码结果与 Data62供后续步骤使用
qrcode_store: dict = {}
# 按 key 缓存联系人索引name(微信号/昵称/备注) -> 联系人详情
_contact_index: Dict[str, Dict[str, dict]] = {}
_LOG_FMT = "%(asctime)s [%(levelname)s] %(name)s - %(message)s"
logging.basicConfig(level=logging.INFO, format=_LOG_FMT)
# 日志落盘:写入 data/logs/app.log便于排查可按 LOG_DIR 覆盖目录)
_data_dir = os.getenv("DATA_DIR") or os.path.join(os.path.dirname(__file__), "data")
_log_dir = os.getenv("LOG_DIR") or os.path.join(_data_dir, "logs")
os.makedirs(_log_dir, exist_ok=True)
_app_log = os.path.join(_log_dir, "app.log")
_file_handler = RotatingFileHandler(_app_log, maxBytes=5 * 1024 * 1024, backupCount=5, encoding="utf-8")
_file_handler.setFormatter(logging.Formatter(_LOG_FMT))
logging.getLogger().addHandler(_file_handler)
logger = logging.getLogger("wechat-backend")
def _is_self_sent(msg: dict) -> bool:
"""判断是否为当前账号自己发出的消息(则不由 AI 回复)。"""
if msg.get("direction") == "out":
return True
if msg.get("IsSelf") in (1, True, "1"):
return True
return False
def _allowed_ai_reply(key: str, from_user: str) -> bool:
"""分级处理:仅超级管理员或白名单内的联系人可获得 AI 回复,其他一律不回复。"""
if not from_user or not from_user.strip():
return False
cfg = store.get_ai_reply_config(key)
if not cfg:
logger.debug("AI reply skipped: no config for key=%s (请在管理页「AI 回复设置」保存超级管理员/白名单)", key[:8])
return False
super_admins = set(cfg.get("super_admin_wxids") or [])
whitelist = set(cfg.get("whitelist_wxids") or [])
allowed = from_user.strip() in super_admins or from_user.strip() in whitelist
if not allowed:
logger.debug("AI reply skipped: from_user=%s not in whitelist/super_admin for key=%s", from_user[:20], key[:8])
return allowed
def _is_super_admin(key: str, from_user: str) -> bool:
"""仅超级管理员可调用代发消息等 function call白名单仅可得到普通回复。"""
if not from_user or not from_user.strip():
return False
cfg = store.get_ai_reply_config(key)
if not cfg:
return False
super_admins = set(cfg.get("super_admin_wxids") or [])
return from_user.strip() in super_admins
async def _ai_takeover_reply(key: str, from_user: str, content: str) -> None:
"""收到他人消息时由 AI 接管:根据指令生成回复或调用内置动作(如代发消息)。"""
if not from_user or not content or not content.strip():
return
try:
recent = store.list_sync_messages(key, limit=10)
# 仅取与该用户的最近几条作为上下文(简化:只取最后几条)
context = []
for m in reversed(recent):
c = (m.get("Content") or m.get("content") or "").strip()
if not c:
continue
if m.get("direction") == "out" and (m.get("ToUserName") or "").strip() == from_user:
context.append({"role": "assistant", "content": c})
elif (m.get("FromUserName") or m.get("from") or "").strip() == from_user and not _is_self_sent(m):
context.append({"role": "user", "content": c})
if len(context) >= 6:
break
if not context or context[-1].get("role") != "user":
context.append({"role": "user", "content": content})
# Function Call 风格:让模型选择是直接回复,还是发起“代发消息”等动作
system_prompt = (
"你是微信客服助手,负责根据用户消息决定是直接回复,还是调用内置动作。"
"只用 JSON 回复,格式如下(不要多余文本):\n"
"{\n"
' \"type\": \"reply\" | \"send_message\",\n'
' \"reply\"?: \"直接回复给当前联系人的内容(当 type=reply 时必填)\",\n'
' \"target_wxid\"?: \"要代发消息的对象 wxid当 type=send_message 时必填,可以是当前联系人或其他 wxid\",\n'
' \"content\"?: \"要代发的消息内容(当 type=send_message 时必填)\"\n'
"}\n"
"注意:\n"
"1如果用户只是正常聊天优先使用 type=reply\n"
"2只有当用户明确指令你“帮我给某人发消息/转告/通知 xxx”时才使用 type=send_message\n"
"3严禁凭空编造 target_wxid若无法确定对方 wxid请继续使用 type=reply 询问用户;\n"
"4不要用自然语言解释 JSON只输出 JSON 本身。"
)
messages = [{"role": "system", "content": system_prompt}, *context]
raw = await llm_chat(messages)
if not raw or not raw.strip():
return
action = None
try:
import json as _json
action = _json.loads(raw)
except Exception:
# 回退为普通文本回复
reply_text = raw.strip()
await _send_message_upstream(key, from_user, reply_text)
logger.info("AI takeover replied (fallback) to %s: %s", from_user[:20], reply_text[:50])
return
if not isinstance(action, dict) or "type" not in action:
return
action_type = str(action.get("type") or "").strip()
if action_type == "send_message":
if not _is_super_admin(key, from_user):
await _send_message_upstream(
key, from_user,
"代发消息等操作仅限超级管理员使用,您可正常聊天获得回复。"
)
logger.info("AI send_message rejected: from_user not super_admin (key=***%s)", key[-4:] if len(key) >= 4 else "****")
return
target = str(action.get("target_wxid") or "").strip()
msg = str(action.get("content") or "").strip()
if target and msg:
# 先按“昵称/备注/微信号”解析成真正的 wxid
real_wxid = await _resolve_contact_username(key, target)
if not real_wxid:
# 回一条提示给当前联系人,说明未找到该联系人
warn = f"未找到名为「{target}」的联系人,请确认称呼或直接提供对方微信号。"
await _send_message_upstream(key, from_user, warn)
logger.info("AI send_message resolve failed for %s (key=***%s)", target, key[-4:] if len(key) >= 4 else "****")
return
await _send_message_upstream(key, real_wxid, msg)
logger.info("AI function-call send_message to %s (raw=%s): %s", real_wxid[:20], target[:20], msg[:50])
elif action_type == "reply":
reply_text = str(action.get("reply") or "").strip()
if reply_text:
await _send_message_upstream(key, from_user, reply_text)
logger.info("AI takeover replied to %s: %s", from_user[:20], reply_text[:50])
except Exception as e:
logger.exception("AI takeover reply error (from=%s): %s", from_user, e)
def _on_ws_message(key: str, data: Any) -> None:
"""GetSyncMsg / 回调 收到数据时:写入 store若为他人消息则 AI 接管对话。"""
# 1上游典型结构{"MsgList": [...]} / {"List": [...]} / {"msgList": [...]}
if isinstance(data, dict):
msg_list = data.get("MsgList") or data.get("List") or data.get("msgList")
if isinstance(msg_list, list) and msg_list:
store.append_sync_messages(key, msg_list)
for m in msg_list:
if _is_self_sent(m):
continue
from_user = (m.get("FromUserName") or m.get("from") or _unwrap_wechat_field(m.get("from_user_name")) or "").strip()
content = (m.get("Content") or m.get("content") or _unwrap_wechat_field(m.get("content")) or "").strip()
msg_type = m.get("MsgType") or m.get("msgType")
if from_user and content and (msg_type in (1, None) or str(msg_type) == "1"): # 仅文本触发 AI
if not _allowed_ai_reply(key, from_user):
continue
try:
asyncio.get_running_loop().create_task(_ai_takeover_reply(key, from_user, content))
except RuntimeError:
pass
return
# 2如果 data 本身就是列表(例如回调已归一化为 [normalized_msg]
if isinstance(data, list):
store.append_sync_messages(key, data)
for m in data:
if not isinstance(m, dict) or _is_self_sent(m):
continue
from_user = (m.get("FromUserName") or m.get("from") or _unwrap_wechat_field(m.get("from_user_name")) or "").strip()
content = (m.get("Content") or m.get("content") or _unwrap_wechat_field(m.get("content")) or "").strip()
msg_type = m.get("MsgType") or m.get("msgType")
if from_user and content and (msg_type in (1, None) or str(msg_type) == "1"):
if not _allowed_ai_reply(key, from_user):
continue
try:
asyncio.get_running_loop().create_task(_ai_takeover_reply(key, from_user, content))
except RuntimeError:
pass
return
# 3兜底单条 dict / 其它类型
store.append_sync_messages(key, [data])
m = data if isinstance(data, dict) else {}
if not _is_self_sent(m):
from_user = (m.get("FromUserName") or m.get("from") or _unwrap_wechat_field(m.get("from_user_name")) or "").strip()
content = (m.get("Content") or m.get("content") or _unwrap_wechat_field(m.get("content")) or "").strip()
msg_type = m.get("MsgType") or m.get("msgType")
if from_user and content and (msg_type in (1, None) or str(msg_type) == "1"):
if not _allowed_ai_reply(key, from_user):
return
try:
asyncio.get_running_loop().create_task(_ai_takeover_reply(key, from_user, content))
except RuntimeError:
pass
async def _register_message_callback(key: str) -> bool:
"""向 7006 注册消息回调POST /message/SetCallback?key=xxx使 7006 将新消息推送到本服务。"""
if not CALLBACK_BASE_URL or not key:
return False
url = f"{CHECK_STATUS_BASE_URL.rstrip('/')}/message/SetCallback"
callback_url = f"{CALLBACK_BASE_URL.rstrip('/')}/api/callback/wechat-message"
body = {"CallbackURL": callback_url, "Enabled": True}
try:
async with httpx.AsyncClient(trust_env=False, timeout=10.0) as client:
resp = await client.post(url, params={"key": key}, json=body)
if resp.status_code >= 400:
logger.warning("SetCallback %s key=%s: %s %s", url, key[-4:] if len(key) >= 4 else "****", resp.status_code, resp.text[:200])
return False
logger.info("SetCallback registered for key=***%s, CallbackURL=%s", key[-4:] if len(key) >= 4 else "****", callback_url)
return True
except Exception as e:
logger.warning("SetCallback error key=%s: %s", key[-4:] if len(key) >= 4 else "****", e)
return False
async def _run_greeting_scheduler() -> None:
"""定时检查到期问候任务,通过发送消息接口向匹配客户发送,并标记已执行。"""
check_interval = 30
while True:
try:
await asyncio.sleep(check_interval)
now = datetime.now()
all_tasks = store.list_greeting_tasks(key=None)
for task in all_tasks:
if not task.get("enabled"):
continue
if task.get("executed_at"):
continue
send_time = task.get("send_time") or task.get("cron")
if not send_time:
continue
dt = _parse_send_time(send_time)
if not dt or dt > now:
continue
task_id = task.get("id")
key = task.get("key")
customer_tags = set(task.get("customer_tags") or [])
template = (task.get("template") or "").strip() or "{{name}},您好!"
use_qwen = bool(task.get("use_qwen"))
customers = store.list_customers(key)
if customer_tags:
customers = [c for c in customers if set(c.get("tags") or []) & customer_tags]
for c in customers:
wxid = c.get("wxid")
if not wxid:
continue
remark_name = (c.get("remark_name") or "").strip() or wxid
if use_qwen:
user = f"请生成一句简短的微信问候语1-2句话客户备注名{remark_name}"
region = (c.get("region") or "").strip()
if region:
user += f",地区:{region}"
tags = c.get("tags") or []
if tags:
user += f",标签:{','.join(tags)}"
user += "。不要解释,只输出问候语本身。"
try:
content = await llm_chat([{"role": "user", "content": user}])
except Exception as e:
logger.warning("Greeting task %s llm_chat error: %s", task_id, e)
content = template.replace("{{name}}", remark_name)
if not content or not content.strip():
content = template.replace("{{name}}", remark_name)
else:
content = template.replace("{{name}}", remark_name)
try:
await _send_message_upstream(key, wxid, content)
logger.info("Greeting task %s sent to %s", task_id, wxid)
except Exception as e:
logger.warning("Greeting task %s send to %s failed: %s", task_id, wxid, e)
store.update_greeting_task(task_id, executed_at=now.isoformat(), enabled=False)
logger.info("Greeting task %s executed_at set", task_id)
except asyncio.CancelledError:
break
except Exception as e:
logger.exception("Greeting scheduler error: %s", e)
@asynccontextmanager
async def lifespan(app: FastAPI):
set_message_callback(_on_ws_message)
_callback_key = (os.getenv("WECHAT_WS_KEY") or os.getenv("KEY") or os.getenv("WS_KEY") or "").strip()
if CALLBACK_BASE_URL:
keys_to_register = set(store.list_all_keys())
if _callback_key:
keys_to_register.add(_callback_key)
for k in keys_to_register:
if k and k.strip():
ok = await _register_message_callback(k.strip())
if ok:
logger.info("启动时已注册回调 key=***%s", k[-4:] if len(k) >= 4 else "****")
if keys_to_register:
logger.info("消息接收已切换为实时回调入口,不再启动 WS GetSyncMsg")
else:
asyncio.create_task(start_ws_sync())
else:
asyncio.create_task(start_ws_sync())
scheduler = asyncio.create_task(_run_greeting_scheduler())
yield
scheduler.cancel()
try:
await scheduler
except asyncio.CancelledError:
pass
app = FastAPI(title="WeChat Admin Backend (FastAPI)", lifespan=lifespan)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
class QrCodeRequest(BaseModel):
key: str
Proxy: Optional[str] = ""
IpadOrmac: Optional[str] = ""
Check: Optional[bool] = False
"""仅当需滑块且为「无数字」时传 True重新取码用 Mac 设备;其余一律 iPad传 Mac 易封号。"""
force_mac: Optional[bool] = False
class WakeUpRequest(BaseModel):
"""唤醒登录(只限扫码登录),仅调用 7006 WakeUpLogin不拉取二维码。"""
key: str
Check: Optional[bool] = False
IpadOrmac: Optional[str] = "ipad"
Proxy: Optional[str] = ""
@app.middleware("http")
async def log_requests(request: Request, call_next):
logger.info("HTTP %s %s from %s", request.method, request.url.path, request.client.host if request.client else "-")
response = await call_next(request)
logger.info("HTTP %s %s -> %s", request.method, request.url.path, response.status_code)
return response
@app.get("/health")
async def health() -> dict:
logger.info("Health check")
return {"status": "ok", "backend": "fastapi", "upstream": WECHAT_UPSTREAM_BASE_URL}
@app.get("/api/ws-status")
async def api_ws_status() -> dict:
"""WSGetSyncMsg连接状态供前端在掉线时跳转登录页。"""
return {"connected": is_ws_connected()}
# 代理检测:用当前代理访问测试 URL验证是否可用
PROXY_CHECK_URL = os.getenv("PROXY_CHECK_URL", "https://httpbin.org/ip")
@app.get("/api/check-proxy")
async def api_check_proxy(proxy: Optional[str] = Query(None, description="可选,指定要检测的代理 URL不传则用环境变量/隧道/KDL")):
"""检测代理是否可用:用解析到的代理请求测试页,返回是否成功及来源。"""
proxy_url = (proxy or "").strip()
source = "query"
if not proxy_url:
proxy_url = _resolve_proxy("", allow_auto=True)
if proxy_url == "__tunnel__":
proxy_url = _proxy_from_tunnel()
source = "tunnel"
logger.info("check-proxy: using tunnel -> %s", "socks5h://***@%s/" % TUNNEL_PROXY)
elif proxy_url == "__kdl__":
proxy_url = await _proxy_from_kdl()
source = "kdl"
logger.info("check-proxy: using kdl -> %s", "http://***@%s/" % (proxy_url.split("@", 1)[-1].rstrip("/") if proxy_url else "?"))
else:
source = "env" if proxy_url else "none"
if proxy_url:
logger.info("check-proxy: using env/body, len=%s", len(proxy_url))
if not proxy_url:
return {
"ok": False,
"source": "none",
"error": "未配置代理。请填写代理、或设置 HTTP_PROXY/HTTPS_PROXY、或配置 TUNNEL_PROXY固定隧道、或 KDL 代理 API。",
}
# 脱敏显示(不暴露密码)
def _preview(u: str) -> str:
if not u or "@" not in u:
return u[:50] + "" if len(u) > 50 else u
pre, at = u.rsplit("@", 1)
if "://" in pre:
scheme = pre.split("://", 1)[0] + "://"
rest = pre[len(scheme) :]
if ":" in rest:
user, _ = rest.split(":", 1)
pre = scheme + user + ":***"
else:
pre = scheme + "***"
else:
pre = "***"
return pre + "@" + (at[:30] + "" if len(at) > 30 else at)
preview = _preview(proxy_url)
logger.info("check-proxy: source=%s, proxy_preview=%s", source, preview)
is_socks = proxy_url.strip().lower().startswith("socks5")
if is_socks:
try:
from httpx_socks import AsyncProxyTransport
# httpx_socks/urllib 仅识别 socks5将 socks5h 转为 socks5
transport_url = proxy_url.strip()
if transport_url.lower().startswith("socks5h://"):
transport_url = "socks5://" + transport_url[10:]
transport = AsyncProxyTransport.from_url(transport_url)
async with httpx.AsyncClient(trust_env=False, timeout=15.0, transport=transport) as client:
resp = await client.get(PROXY_CHECK_URL)
if resp.status_code == 200:
logger.info("check-proxy: ok (socks), status=%s", resp.status_code)
return {
"ok": True,
"source": source,
"proxy_preview": preview,
"check_url": PROXY_CHECK_URL,
"status_code": resp.status_code,
}
logger.warning("check-proxy: fail (socks), status=%s", resp.status_code)
return {
"ok": False,
"source": source,
"proxy_preview": preview,
"error": f"请求测试页返回 {resp.status_code}",
"status_code": resp.status_code,
}
except ImportError:
logger.info("check-proxy: socks5 已配置,跳过连通性检测(需 pip install httpx-socks 方可检测)")
return {
"ok": True,
"source": source,
"proxy_preview": preview,
"note": "socks5 代理已配置;连通性检测需安装 pip install httpx-socks",
}
except Exception as e:
logger.warning("check-proxy: socks exception %s", e)
return {
"ok": False,
"source": source,
"proxy_preview": preview,
"error": str(e),
}
try:
async with httpx.AsyncClient(trust_env=False, timeout=15.0, proxy=proxy_url) as client:
resp = await client.get(PROXY_CHECK_URL)
if resp.status_code == 200:
logger.info("check-proxy: ok, status=%s", resp.status_code)
return {
"ok": True,
"source": source,
"proxy_preview": preview,
"check_url": PROXY_CHECK_URL,
"status_code": resp.status_code,
}
logger.warning("check-proxy: fail, status=%s", resp.status_code)
return {
"ok": False,
"source": source,
"proxy_preview": preview,
"error": f"请求测试页返回 {resp.status_code}",
"status_code": resp.status_code,
}
except Exception as e:
logger.warning("check-proxy: exception %s", e)
return {
"ok": False,
"source": source,
"proxy_preview": preview,
"error": str(e),
}
def _proxy_from_env() -> str:
"""当登录页未填代理时,使用环境变量中的代理(服务器上设置 HTTP_PROXY/HTTPS_PROXY 后生效)。"""
return (
(os.environ.get("HTTPS_PROXY") or os.environ.get("https_proxy") or "").strip()
or (os.environ.get("HTTP_PROXY") or os.environ.get("http_proxy") or "").strip()
or ""
)
# 固定隧道代理socks5h + 用户名密码):未配置登录页/环境变量时使用
# 格式与 requests 示例一致socks5h://user:pwd@host:port/
TUNNEL_PROXY = (os.getenv("TUNNEL_PROXY") or "").strip() # 例如 218.78.109.253:16816
TUNNEL_PROXY_USERNAME = (os.getenv("TUNNEL_PROXY_USERNAME") or "").strip()
TUNNEL_PROXY_PASSWORD = (os.getenv("TUNNEL_PROXY_PASSWORD") or "").strip()
# 快代理 KDL API可选隧道未配置时从此接口拉取代理
KDL_PROXY_API_URL = (os.getenv("KDL_PROXY_API_URL") or "").strip()
KDL_PROXY_USERNAME = (os.getenv("KDL_PROXY_USERNAME") or "").strip()
KDL_PROXY_PASSWORD = (os.getenv("KDL_PROXY_PASSWORD") or "").strip()
# 启动时打印代理配置情况(不打印密码)
def _log_proxy_config() -> None:
tunnel_ok = bool(TUNNEL_PROXY and TUNNEL_PROXY_USERNAME and TUNNEL_PROXY_PASSWORD)
kdl_ok = bool(KDL_PROXY_API_URL and KDL_PROXY_USERNAME and KDL_PROXY_PASSWORD)
logger.info(
"proxy config: tunnel=%s (TUNNEL_PROXY=%s), kdl=%s (KDL_API=%s)",
tunnel_ok,
TUNNEL_PROXY or "(empty)",
kdl_ok,
"set" if KDL_PROXY_API_URL else "(empty)",
)
_log_proxy_config()
def _proxy_from_tunnel() -> str:
"""使用固定隧道代理,格式 socks5h://user:pwd@host:port/,供 7006 使用。"""
if not TUNNEL_PROXY:
return ""
user = (TUNNEL_PROXY_USERNAME or KDL_PROXY_USERNAME or "").strip()
pwd = (TUNNEL_PROXY_PASSWORD or KDL_PROXY_PASSWORD or "").strip()
if not user or not pwd:
return ""
return "socks5h://%(user)s:%(pwd)s@%(proxy)s/" % {
"user": user,
"pwd": pwd,
"proxy": TUNNEL_PROXY,
}
async def _proxy_from_kdl() -> str:
"""从快代理 API 获取一个代理 IP格式化为 http://user:pwd@ip:port/ 供 7006 使用。"""
if not KDL_PROXY_API_URL or not KDL_PROXY_USERNAME or not KDL_PROXY_PASSWORD:
return ""
try:
async with httpx.AsyncClient(trust_env=False, timeout=10.0) as client:
resp = await client.get(KDL_PROXY_API_URL)
resp.raise_for_status()
proxy_ip = (resp.text or "").strip()
if not proxy_ip:
return ""
return "http://%(user)s:%(pwd)s@%(proxy)s/" % {
"user": KDL_PROXY_USERNAME,
"pwd": KDL_PROXY_PASSWORD,
"proxy": proxy_ip,
}
except Exception as e:
logger.warning("KDL proxy fetch failed: %s", e)
return ""
# 隧道代理若以 http 形式传入(登录页或 KDL 返回),统一改为 socks5h 再传给 7006
TUNNEL_PROXY_NORMALIZE_HOST = (os.getenv("TUNNEL_PROXY_NORMALIZE_HOST") or "218.78.109.253:16816").strip()
def _proxy_preview_for_log(proxy: str) -> str:
"""代理脱敏,用于日志打印(不暴露密码)。"""
if not proxy or not isinstance(proxy, str):
return "(empty)"
u = proxy.strip()
if not u:
return "(empty)"
if "@" not in u:
return u[:50] + "" if len(u) > 50 else u
pre, at = u.rsplit("@", 1)
at = at.rstrip("/").split("/")[0].split("?")[0]
if "://" in pre:
scheme = pre.split("://", 1)[0] + "://"
rest = pre[len(scheme):]
user = rest.split(":", 1)[0] if ":" in rest else "***"
pre = scheme + user + ":***"
else:
pre = "***"
return pre + "@" + (at[:40] + "" if len(at) > 40 else at) + "/"
def _normalize_proxy_scheme_to_socks5h(proxy: str) -> str:
"""若代理是隧道地址但用了 http改为 socks5h7006 需 socks5"""
if not proxy or not isinstance(proxy, str):
return proxy
p = proxy.strip()
if not p.startswith("http://") or "@" not in p:
return p
try:
host_part = p.split("@", 1)[1].rstrip("/").split("/")[0].split("?")[0]
except IndexError:
return p
if host_part != TUNNEL_PROXY and host_part != "218.78.109.253:16816":
return p
out = "socks5h://" + p[7:]
logger.info("proxy normalize: http -> socks5h for tunnel %s", host_part)
return out
def _resolve_proxy(body_proxy: str, *, allow_auto: bool = True) -> str:
"""解析最终传给 7006 的代理:请求体 > 环境变量 > 固定隧道 >可选KDL API。"""
p = (body_proxy or "").strip()
if p:
logger.debug("proxy resolve: from body, len=%s", len(p))
return p
p = _proxy_from_env()
if p:
logger.debug("proxy resolve: from env (HTTP_PROXY/HTTPS_PROXY), len=%s", len(p))
return p
if not allow_auto:
return ""
# 隧道:只要 TUNNEL_PROXY 有值且能凑齐账号密码(含用 KDL_* 兜底)则优先隧道
tunnel_user = (TUNNEL_PROXY_USERNAME or KDL_PROXY_USERNAME or "").strip()
tunnel_pwd = (TUNNEL_PROXY_PASSWORD or KDL_PROXY_PASSWORD or "").strip()
if TUNNEL_PROXY and tunnel_user and tunnel_pwd:
logger.info("proxy resolve: auto -> tunnel (socks5h), TUNNEL_PROXY=%s", TUNNEL_PROXY)
return "__tunnel__"
if KDL_PROXY_API_URL and KDL_PROXY_USERNAME and KDL_PROXY_PASSWORD:
logger.info("proxy resolve: auto -> kdl (fetch from API)")
return "__kdl__"
logger.debug("proxy resolve: no auto proxy configured")
return ""
@app.post("/auth/wake")
async def wake_up_login(body: WakeUpRequest):
"""唤醒登录:仅调用上游 /login/WakeUpLogin只限扫码登录不获取二维码。"""
key = (body.key or "").strip()
if not key:
raise HTTPException(status_code=400, detail="key is required")
proxy = _resolve_proxy(body.Proxy or "", allow_auto=True)
if proxy == "__tunnel__":
proxy = _proxy_from_tunnel()
if proxy:
logger.info("WakeUpLogin: using proxy from tunnel (socks5h), len=%s", len(proxy))
elif proxy == "__kdl__":
proxy = await _proxy_from_kdl()
if proxy:
logger.info("WakeUpLogin: using proxy from KDL API, len=%s", len(proxy))
if not proxy:
logger.info("WakeUpLogin: Proxy 为空,请在 .env 中配置 TUNNEL_PROXY 或 HTTP_PROXY/HTTPS_PROXY 或 KDL或登录页填写代理")
elif proxy not in ("__tunnel__", "__kdl__"):
logger.info("WakeUpLogin: using proxy from body/env, len=%s", len(proxy))
if proxy in ("__tunnel__", "__kdl__"):
proxy = ""
proxy = _normalize_proxy_scheme_to_socks5h(proxy)
ipad_ormac = (body.IpadOrmac or "").strip() or "ipad"
payload = {
"Check": body.Check,
"IpadOrmac": ipad_ormac,
"Proxy": proxy,
}
url = f"{WECHAT_UPSTREAM_BASE_URL.rstrip('/')}/login/WakeUpLogin"
logger.info(
"WakeUpLogin 请求参数: key=%s, url=%s, Proxy=%s, Check=%s, IpadOrmac=%s",
key, url, proxy, body.Check, ipad_ormac,
)
try:
async with httpx.AsyncClient(trust_env=False, timeout=20.0) as client:
resp = await client.post(url, params={"key": key}, json=payload)
except Exception as exc:
logger.exception("Error calling upstream WakeUpLogin: %s", exc)
raise HTTPException(
status_code=502,
detail={"error": "upstream_connect_error", "detail": str(exc)},
) from exc
if resp.status_code >= 400:
body_preview = resp.text[:500]
logger.warning("WakeUpLogin bad response: status=%s, body=%s", resp.status_code, body_preview)
raise HTTPException(
status_code=502,
detail={"error": "upstream_bad_response", "status_code": resp.status_code, "body": body_preview},
)
try:
data = resp.json()
except Exception:
data = {"ok": True, "text": resp.text[:200]}
logger.info("WakeUpLogin success: status=%s", resp.status_code)
return data
@app.post("/auth/qrcode")
async def get_login_qrcode(body: QrCodeRequest):
key = body.key
if not key:
raise HTTPException(status_code=400, detail="key is required")
proxy = _resolve_proxy(body.Proxy or "", allow_auto=True)
if proxy == "__tunnel__":
proxy = _proxy_from_tunnel()
if proxy:
logger.info("GetLoginQrCodeNewDirect: using proxy from tunnel (socks5h), len=%s", len(proxy))
elif proxy == "__kdl__":
proxy = await _proxy_from_kdl()
if proxy:
logger.info("GetLoginQrCodeNewDirect: using proxy from KDL API, len=%s", len(proxy))
if proxy in ("__tunnel__", "__kdl__"):
proxy = ""
proxy = _normalize_proxy_scheme_to_socks5h(proxy)
if proxy:
logger.info("GetLoginQrCodeNewDirect: proxy=yes, force_mac=%s, IpadOrmac=%s", body.force_mac, "mac" if body.force_mac else (body.IpadOrmac or "ipad"))
else:
logger.info("GetLoginQrCodeNewDirect: proxy=empty未配置则后端自动读 env/KDLforce_mac=%s", body.force_mac)
payload = body.dict(exclude={"key", "force_mac"})
payload["Check"] = False
payload["IpadOrmac"] = "mac" if body.force_mac else ((body.IpadOrmac or "").strip() or "ipad")
payload["Proxy"] = proxy
url = f"{WECHAT_UPSTREAM_BASE_URL}/login/GetLoginQrCodeNewDirect"
logger.info(
"GetLoginQrCodeNewDirect 请求参数: key=%s, url=%s, Proxy=%s, Check=%s, IpadOrmac=%s",
key, url, proxy, False, payload["IpadOrmac"],
)
try:
async with httpx.AsyncClient(trust_env=False, timeout=20.0) as client:
resp = await client.post(url, params={"key": key}, json=payload)
except Exception as exc:
logger.exception("Error calling upstream GetLoginQrCodeNewDirect: %s", exc)
raise HTTPException(
status_code=502,
detail={"error": "upstream_connect_error", "detail": str(exc)},
) from exc
body_text = resp.text[:500]
if resp.status_code >= 400:
logger.warning(
"Upstream GetLoginQrCodeNewDirect bad response: status=%s, body=%s",
resp.status_code,
body_text,
)
raise HTTPException(
status_code=502,
detail={
"error": "upstream_bad_response",
"status_code": resp.status_code,
"body": body_text,
},
)
logger.info(
"Upstream GetLoginQrCodeNewDirect success: status=%s, body_len=%s",
resp.status_code,
len(body_text),
)
data = resp.json()
# 保存 Data62 完整原始数据,不清理不截断;仅做完整性校验供前端打印到操作日志
try:
data62_full = (data.get("Data62") or "").strip()
if not data62_full and isinstance(data.get("Data"), dict):
data62_full = (data.get("Data").get("Data62") or data.get("Data").get("data62") or "").strip()
qrcode_store[key] = {"data62": data62_full, "response": data}
data["_data62_stored"] = True
data["_data62_length"] = len(data62_full)
check = _validate_data62(data62_full)
data["_data62_valid"] = check["valid"]
data["_data62_check"] = check["message"]
data["_data62_raw_length"] = check["raw_length"]
data["_data62_clean_length"] = len(data62_full)
data["_data62_preview"] = check["preview"]
logger.info(
"Stored Data62 (full) for key=%s (len=%s), valid=%s, check=%s",
key, len(data62_full), check["valid"], check["message"],
)
logger.info("Data62 full: %s", data62_full)
except Exception as e:
logger.warning("Store qrcode data for key=%s failed: %s", key, e)
return data
@app.get("/auth/status")
async def get_online_status(
key: str = Query(..., description="账号唯一标识"),
):
if not key:
raise HTTPException(status_code=400, detail="key is required")
url = f"{WECHAT_UPSTREAM_BASE_URL}/login/GetLoginStatus"
logger.info("GetLoginStatus: key=%s, url=%s", key, url)
try:
async with httpx.AsyncClient(trust_env=False, timeout=15.0) as client:
resp = await client.get(url, params={"key": key})
except Exception as exc:
logger.exception("Error calling upstream GetLoginStatus: %s", exc)
raise HTTPException(status_code=502, detail=f"upstream_error: {exc}") from exc
body_text = resp.text[:500]
logger.info(
"Upstream GetLoginStatus response: status=%s, body=%s",
resp.status_code,
body_text,
)
return resp.json()
def _extract_clean_ticket(obj: dict) -> Optional[str]:
"""从扫码状态返回中提取 ticket去掉乱码只保留可见 ASCII 到第一个非法字符前)。"""
if not obj or not isinstance(obj, dict):
return None
d = obj.get("Data") if isinstance(obj.get("Data"), dict) else obj
raw = (
(d.get("ticket") if d else None)
or obj.get("ticket")
or obj.get("Ticket")
)
if not raw:
wvu = obj.get("wechat_verify_url") or ""
if isinstance(wvu, str) and "ticket=" in wvu:
raw = wvu.split("ticket=", 1)[1].split("&")[0]
if not raw or not isinstance(raw, str):
return None
clean = []
for ch in raw:
code = ord(ch)
if code == 0xFFFD or code < 32 or code > 126:
break
clean.append(ch)
return "".join(clean) if clean else None
def _clean_data62(s: str) -> str:
"""去掉 Data62 尾部乱码:以 d000 标识乱码起始,截断保留此前有效内容。"""
if not s or not isinstance(s, str):
return ""
s = s.strip()
idx = s.find("d0000000000000101000000000000000d0000000000000000000000000000007f")
if idx != -1:
return s[:idx].strip()
idx = s.find("d0000000000000101")
if idx != -1:
return s[:idx].strip()
idx = s.find("d00000000")
if idx != -1:
return s[:idx].strip()
return s
def _validate_data62(raw: str) -> dict:
"""检查 Data62 完整原始数据是否有效,不清理不截断,仅做格式与长度校验。"""
raw = (raw or "").strip()
raw_len = len(raw)
min_len = 32
hex_ok = bool(raw and all(c in "0123456789abcdefABCDEF" for c in raw))
length_ok = raw_len >= min_len
valid = bool(raw and hex_ok and length_ok)
if not raw:
msg = "无 Data62 或为空"
elif not hex_ok:
msg = "非十六进制格式"
elif not length_ok:
msg = f"长度不足({raw_len} < {min_len}"
else:
msg = "完整有效"
preview = (raw[:80] + "") if raw_len > 80 else (raw or "")
return {
"valid": valid,
"message": msg,
"raw_length": raw_len,
"clean_length": raw_len,
"preview": preview,
}
@app.get("/auth/scan-status")
async def check_scan_status(
key: str = Query(..., description="账号唯一标识"),
):
if not key:
raise HTTPException(status_code=400, detail="key is required")
url = f"{CHECK_STATUS_BASE_URL}/login/CheckLoginStatus"
logger.info("CheckLoginStatus: key=%s, url=%s", key, url)
try:
async with httpx.AsyncClient(trust_env=False, timeout=15.0) as client:
resp = await client.get(url, params={"key": key})
except Exception as exc:
logger.exception("Error calling upstream CheckLoginStatus: %s", exc)
raise HTTPException(status_code=502, detail=f"upstream_error: {exc}") from exc
body_full = resp.text
logger.info(
"Upstream CheckLoginStatus response: status=%s, body=%s",
resp.status_code,
body_full[:2000] if len(body_full) > 2000 else body_full,
)
try:
data = resp.json() if body_full.strip() else {}
except Exception:
data = {"Code": resp.status_code, "Text": body_full[:500] or "Non-JSON response"}
if not isinstance(data, dict):
data = {"Code": resp.status_code, "Text": str(data)[:500]}
ticket = _extract_clean_ticket(data)
if ticket:
# data62 使用完整原始数据,来自 GetLoginQrCodeNewDirect 的存储或本次响应的 Data62
stored = qrcode_store.get(key) or {}
data62 = (stored.get("data62") or "").strip()
if not data62:
data62 = (data.get("Data62") or (data.get("Data") or {}).get("Data62") or (data.get("Data") or {}).get("data62") or "").strip()
params = {"key": SLIDER_VERIFY_KEY, "ticket": ticket}
if data62:
params["data62"] = data62
data["slider_url"] = f"/auth/slider-form?{urlencode(params)}"
logger.info(
"Attached slider_url (slider-form) for key=%s (ticket len=%s, data62 len=%s)",
key,
len(ticket),
len(data62),
)
return data
def _slider_form_html(key_val: str, data62_val: str, ticket_val: str) -> str:
"""本地滑块验证页:与 7765 相同 DOM 结构(#app、keyInput、data62Input、originalTicketInput加载 7765 的 module 脚本,不用 iframe。"""
k = html.escape(key_val, quote=True)
d = html.escape(data62_val, quote=True)
t = html.escape(ticket_val, quote=True)
# 脚本走本机代理 /auth/slider-assets/...,避免跨域加载 7765 被 CORS 拦截
script_src = "/auth/slider-assets/N_jYM_2V.js"
return f"""<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<title>滑块验证</title>
<link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3/dist/css/bootstrap.min.css" rel="stylesheet" crossorigin="anonymous">
</head>
<body>
<div id="app" data-v-app="">
<div data-v-220db9a4="" class="app-container">
<div data-v-220db9a4="" class="card">
<div data-v-220db9a4="" class="card-header">
<h5 data-v-220db9a4="" class="mb-0">滑块验证</h5>
</div>
<div data-v-220db9a4="" class="card-body">
<div data-v-220db9a4="" class="params-section mb-4">
<div data-v-220db9a4="" class="row">
<div data-v-220db9a4="" class="col-12 col-md-6 mb-3">
<label data-v-220db9a4="" for="keyInput" class="form-label">Key:</label>
<input data-v-220db9a4="" type="text" class="form-control" id="keyInput" placeholder="请输入key" value="{k}">
</div>
<div data-v-220db9a4="" class="col-12 col-md-6 mb-3">
<label data-v-220db9a4="" for="data62Input" class="form-label">Data62:</label>
<input data-v-220db9a4="" type="text" class="form-control" id="data62Input" placeholder="请输入data62" value="{d}">
</div>
<div data-v-220db9a4="" class="col-12 mb-3">
<label data-v-220db9a4="" for="originalTicketInput" class="form-label">Original Ticket:</label>
<input data-v-220db9a4="" type="text" class="form-control" id="originalTicketInput" placeholder="请输入original_ticket" value="{t}">
</div>
</div>
</div>
<div data-v-220db9a4="" class="text-center mb-3">
<button data-v-220db9a4="" type="button" class="btn btn-verify btn-lg" disabled="">开始验证</button>
<div data-v-220db9a4="" class="text-muted mt-2">请先填写完整的参数信息</div>
</div>
</div>
</div>
</div>
</div>
<script type="module" crossorigin src="{script_src}"></script>
</body>
</html>"""
@app.get("/auth/slider-assets/{path:path}")
async def slider_asset_proxy(path: str):
"""代理 7765 的 assets如 N_jYM_2V.js避免跨域加载被 CORS 拦截。"""
url = f"{SLIDER_VERIFY_BASE_URL.rstrip('/')}/assets/{path}"
try:
async with httpx.AsyncClient(trust_env=False, timeout=15.0) as client:
resp = await client.get(url)
if resp.status_code >= 400:
raise HTTPException(status_code=resp.status_code, detail=resp.text[:200])
media_type = "application/javascript" if path.endswith(".js") else "application/octet-stream"
return Response(
content=resp.content,
media_type=media_type,
headers={"Cache-Control": "no-store, no-cache, must-revalidate", "Pragma": "no-cache"},
)
except HTTPException:
raise
except Exception as e:
logger.warning("Slider asset proxy error: %s", e)
raise HTTPException(status_code=502, detail=str(e)) from e
@app.get("/auth/slider-form", response_class=HTMLResponse)
async def slider_form(
key: str = Query(..., description="Key提交到第三方滑块"),
data62: str = Query("", description="Data62"),
ticket: str = Query(..., description="Original Ticket"),
):
"""本地滑块验证页:与 7765 同 DOM脚本经本机代理加载避免 CORS。Data62 使用完整原始数据。"""
return HTMLResponse(content=_slider_form_html(key, data62.strip(), ticket))
# ---------- 滑块验证提交接口(代理 7765 ----------
# 7765 页面提交为 GETaction=SLIDER_VERIFY_BASE_URL参数 key、data62、original_ticket
@app.get("/api/slider-verify")
async def api_slider_verify_get(
key: str = Query(..., description="Key"),
data62: str = Query("", description="Data62"),
original_ticket: str = Query(..., description="Original Ticket与 ticket 二选一)"),
ticket: str = Query("", description="Original Ticket与 original_ticket 二选一)"),
):
"""代理 7765 滑块提交GET 转发到 http://113.44.162.180:7765/?key=&data62=&original_ticket=,返回上游响应。"""
ticket_val = original_ticket or ticket
if not ticket_val:
raise HTTPException(status_code=400, detail="original_ticket or ticket required")
url = SLIDER_VERIFY_BASE_URL.rstrip("/") + "/"
params = {"key": key, "data62": (data62 or "").strip(), "original_ticket": ticket_val}
try:
async with httpx.AsyncClient(trust_env=False, timeout=30.0) as client:
resp = await client.get(url, params=params)
# 返回上游的 body若为 JSON 则解析后返回
try:
return resp.json()
except Exception:
return {"ok": resp.status_code == 200, "status_code": resp.status_code, "text": resp.text[:500]}
except Exception as e:
logger.warning("Slider verify upstream error: %s", e)
raise HTTPException(status_code=502, detail=f"slider_upstream_error: {e}") from e
class SliderVerifyBody(BaseModel):
key: str
data62: Optional[str] = ""
original_ticket: Optional[str] = None
ticket: Optional[str] = None
@app.post("/api/slider-verify")
async def api_slider_verify_post(body: SliderVerifyBody):
"""代理 7765 滑块提交POST body 转成 GET 请求转发到 7765返回上游响应。"""
ticket_val = body.original_ticket or body.ticket
if not ticket_val:
raise HTTPException(status_code=400, detail="original_ticket or ticket required")
url = SLIDER_VERIFY_BASE_URL.rstrip("/") + "/"
params = {"key": body.key, "data62": (body.data62 or "").strip(), "original_ticket": ticket_val}
try:
async with httpx.AsyncClient(trust_env=False, timeout=30.0) as client:
resp = await client.get(url, params=params)
try:
return resp.json()
except Exception:
return {"ok": resp.status_code == 200, "status_code": resp.status_code, "text": resp.text[:500]}
except Exception as e:
logger.warning("Slider verify upstream error: %s", e)
raise HTTPException(status_code=502, detail=f"slider_upstream_error: {e}") from e
class VerifyCodeBody(BaseModel):
"""iPad 登录验证码7006 /login/VerifyCode 入参。"""
code: str = ""
data62: str = ""
ticket: str = ""
@app.post("/api/verify-code")
async def api_verify_code(key: str = Query(..., description="账号 key"), body: VerifyCodeBody = None):
"""iPad 登录验证码验证:转发到 7006 POST /login/VerifyCode?key=xxxbody 为 code、data62、ticket。"""
if body is None:
body = VerifyCodeBody()
url = f"{CHECK_STATUS_BASE_URL.rstrip('/')}/login/VerifyCode"
payload = {"code": (body.code or "").strip(), "data62": (body.data62 or "").strip(), "ticket": (body.ticket or "").strip()}
try:
async with httpx.AsyncClient(trust_env=False, timeout=30.0) as client:
resp = await client.post(url, params={"key": key}, json=payload)
try:
return resp.json()
except Exception:
return {"ok": resp.status_code == 200, "status_code": resp.status_code, "text": (resp.text or "")[:500]}
except Exception as e:
logger.warning("VerifyCode upstream error: %s", e)
raise HTTPException(status_code=502, detail=f"verify_code_upstream_error: {e}") from e
# ---------- R1-2 客户画像 / R1-3 定时问候 / R1-4 分群推送 / 消息与发送 ----------
class CustomerCreate(BaseModel):
key: str
wxid: str
remark_name: Optional[str] = ""
region: Optional[str] = ""
age: Optional[str] = ""
gender: Optional[str] = ""
level: Optional[str] = "" # 拿货等级
tags: Optional[List[str]] = None
class GreetingTaskCreate(BaseModel):
key: str
name: str
send_time: str # ISO 格式触发时间,如 2026-03-11T14:30:00必须为未来时间
customer_tags: Optional[List[str]] = None
template: str
use_qwen: Optional[bool] = False
class ProductTagCreate(BaseModel):
key: str
name: str
class PushGroupCreate(BaseModel):
key: str
name: str
customer_ids: Optional[List[str]] = None
tag_ids: Optional[List[str]] = None
class PushTaskCreate(BaseModel):
key: str
product_tag_id: str
group_id: str
content: str
send_at: Optional[str] = None
class SendMessageBody(BaseModel):
key: str
to_user_name: str
content: str
class BatchSendItem(BaseModel):
to_user_name: str
content: str
class BatchSendBody(BaseModel):
key: str
items: List[BatchSendItem]
class SendImageBody(BaseModel):
key: str
to_user_name: str
image_content: str # 图片 base64 或 URL依上游约定
text_content: Optional[str] = ""
at_wxid_list: Optional[List[str]] = None
class QwenGenerateBody(BaseModel):
prompt: str
system: Optional[str] = None
@app.get("/api/customers")
async def api_list_customers(key: str = Query(..., description="账号 key")):
return {"items": store.list_customers(key)}
@app.post("/api/customers")
async def api_upsert_customer(body: CustomerCreate):
row = store.upsert_customer(
body.key, body.wxid,
remark_name=body.remark_name or "",
region=body.region or "",
age=body.age or "",
gender=body.gender or "",
level=body.level or "",
tags=body.tags,
)
return row
@app.get("/api/customers/{customer_id}")
async def api_get_customer(customer_id: str):
row = store.get_customer(customer_id)
if not row:
raise HTTPException(status_code=404, detail="customer not found")
return row
@app.get("/api/customer-tags")
async def api_list_customer_tags(key: str = Query(..., description="账号 key")):
"""返回该 key 下客户档案中出现的所有标签,供定时任务等下拉选择。"""
return {"tags": store.list_customer_tags(key)}
@app.delete("/api/customers/{customer_id}")
async def api_delete_customer(customer_id: str):
if not store.delete_customer(customer_id):
raise HTTPException(status_code=404, detail="customer not found")
return {"ok": True}
@app.get("/api/greeting-tasks")
async def api_list_greeting_tasks(key: str = Query(..., description="账号 key")):
return {"items": store.list_greeting_tasks(key)}
def _parse_send_time(s: str) -> Optional[datetime]:
"""解析 ISO 时间字符串,返回 datetime无时区"""
try:
if "T" in s:
return datetime.fromisoformat(s.replace("Z", "+00:00")[:19])
return datetime.strptime(s[:19], "%Y-%m-%d %H:%M:%S")
except Exception:
return None
@app.post("/api/greeting-tasks")
async def api_create_greeting_task(body: GreetingTaskCreate):
dt = _parse_send_time(body.send_time)
if not dt:
raise HTTPException(status_code=400, detail="触发时间格式无效,请使用 日期+时分秒 选择器")
if dt <= datetime.now():
raise HTTPException(status_code=400, detail="触发时间必须是未来时间,请重新选择")
row = store.create_greeting_task(
body.key, body.name, body.send_time,
customer_tags=body.customer_tags or [],
template=body.template,
use_qwen=body.use_qwen or False,
)
return row
@app.patch("/api/greeting-tasks/{task_id}")
async def api_update_greeting_task(task_id: str, body: dict):
if "send_time" in body:
dt = _parse_send_time(body["send_time"])
if not dt:
raise HTTPException(status_code=400, detail="触发时间格式无效")
if dt <= datetime.now():
raise HTTPException(status_code=400, detail="触发时间必须是未来时间")
row = store.update_greeting_task(task_id, **{k: v for k, v in body.items() if k in ("name", "send_time", "customer_tags", "template", "use_qwen", "enabled")})
if not row:
raise HTTPException(status_code=404, detail="task not found")
return row
@app.delete("/api/greeting-tasks/{task_id}")
async def api_delete_greeting_task(task_id: str):
if not store.delete_greeting_task(task_id):
raise HTTPException(status_code=404, detail="task not found")
return {"ok": True}
@app.get("/api/product-tags")
async def api_list_product_tags(key: str = Query(..., description="账号 key")):
return {"items": store.list_product_tags(key)}
@app.post("/api/product-tags")
async def api_create_product_tag(body: ProductTagCreate):
return store.create_product_tag(body.key, body.name)
@app.delete("/api/product-tags/{tag_id}")
async def api_delete_product_tag(tag_id: str):
if not store.delete_product_tag(tag_id):
raise HTTPException(status_code=404, detail="tag not found")
return {"ok": True}
@app.get("/api/push-groups")
async def api_list_push_groups(key: str = Query(..., description="账号 key")):
return {"items": store.list_push_groups(key)}
@app.post("/api/push-groups")
async def api_create_push_group(body: PushGroupCreate):
return store.create_push_group(body.key, body.name, body.customer_ids or [], body.tag_ids or [])
@app.patch("/api/push-groups/{group_id}")
async def api_update_push_group(group_id: str, body: dict):
row = store.update_push_group(
group_id,
name=body.get("name"),
customer_ids=body.get("customer_ids"),
tag_ids=body.get("tag_ids"),
)
if not row:
raise HTTPException(status_code=404, detail="group not found")
return row
@app.delete("/api/push-groups/{group_id}")
async def api_delete_push_group(group_id: str):
if not store.delete_push_group(group_id):
raise HTTPException(status_code=404, detail="group not found")
return {"ok": True}
@app.get("/api/push-tasks")
async def api_list_push_tasks(key: str = Query(..., description="账号 key"), limit: int = Query(100, le=500)):
return {"items": store.list_push_tasks(key, limit=limit)}
@app.post("/api/push-tasks")
async def api_create_push_task(body: PushTaskCreate):
return store.create_push_task(body.key, body.product_tag_id, body.group_id, body.content, body.send_at)
@app.get("/api/messages")
async def api_list_messages(key: str = Query(..., description="账号 key"), limit: int = Query(100, le=500)):
"""拉取同步消息列表并依联系人GetContactDetailsList的昵称将 wxid 换算为展示名FromDisplayName/ToDisplayName不使用客户档案备注。"""
items = store.list_sync_messages(key, limit=limit)
try:
contact_index = await _build_contact_index(key)
except Exception:
contact_index = {}
for m in items:
from_wxid = (m.get("FromUserName") or m.get("from") or "").strip()
to_wxid = (m.get("ToUserName") or m.get("to") or "").strip()
from_info = contact_index.get(from_wxid) if isinstance(contact_index.get(from_wxid), dict) else None
to_info = contact_index.get(to_wxid) if isinstance(contact_index.get(to_wxid), dict) else None
m["FromDisplayName"] = (from_info.get("nick_name") or from_wxid).strip() if from_info else from_wxid
m["ToDisplayName"] = (to_info.get("nick_name") or to_wxid).strip() if to_info else to_wxid
return {"items": items}
@app.get("/api/callback-status")
async def api_callback_status(key: Optional[str] = Query(None, description="账号 key传入时会向 7006 重新注册 SetCallback 并返回是否成功")):
"""检查消息回调配置:是否配置了 CALLBACK_BASE_URL、回调地址以及传入 key 时)向 7006 注册是否成功。"""
callback_url = ""
if CALLBACK_BASE_URL:
callback_url = f"{CALLBACK_BASE_URL.rstrip('/')}/api/callback/wechat-message"
registered: Optional[bool] = None
if key and key.strip():
k = key.strip()
registered = await _register_message_callback(k)
return {
"configured": bool(CALLBACK_BASE_URL),
"callback_url": callback_url or None,
"registered": registered,
}
@app.post("/api/callback/wechat-message")
async def api_callback_wechat_message(request: Request, key: Optional[str] = Query(None, description="账号 key7006 回调时可能带在 query")):
"""7006 消息实时回调入口:与 SetCallback 配合,收到新消息时 7006 POST 到此地址,与 WS GetSyncMsg 同结构,统一走 _on_ws_message 处理。"""
try:
body = await request.json()
except Exception:
body = {}
# 打印回调原始内容,便于排查(截断避免日志过大)
try:
logger.info("callback/wechat-message raw body: %s", str(body)[:1000])
except Exception:
pass
k = (key or (body.get("key") or body.get("Key") or "") or "").strip()
if not k:
logger.warning("callback/wechat-message: missing key in query and body")
return JSONResponse(content={"ok": False, "error": "missing key"}, status_code=200)
# 原始 body 落库,便于回溯与统计
try:
store.append_callback_log(k, body if isinstance(body, dict) else {"raw": str(body)})
except Exception as le:
logger.warning("callback_log append failed: %s", le)
try:
payload: Any = body
# 7006 回调格式:{"key": "...", "message": {...}|[...], "type": "message"} 或 {"key": "...", "Data": {...}}
if isinstance(body, dict) and body.get("message") is not None:
msg = body.get("message")
if isinstance(msg, list):
# message 为数组时逐条归一化
normalized_list = []
for m in msg:
if isinstance(m, dict):
n = _normalize_callback_message({"message": m})
if n:
normalized_list.append(n)
if normalized_list:
payload = normalized_list
else:
normalized = _normalize_callback_message(body)
if normalized:
payload = [normalized]
elif isinstance(body, dict):
inner = body.get("Data") or body.get("data")
if isinstance(inner, dict) and (inner.get("from_user_name") is not None or inner.get("FromUserName") is not None):
# Data/data 为单条消息结构时也归一化,保证 FromUserName/Content 等字段统一
normalized = _normalize_callback_message({"message": inner})
if normalized:
payload = [normalized]
elif isinstance(inner, list):
payload = inner
_on_ws_message(k, payload)
logger.info("callback message saved to sync_messages, key=%s", k[:8] + "..." if len(k) > 8 else k)
except Exception as e:
logger.exception("callback/wechat-message key=%s: %s", k[-4:] if len(k) >= 4 else "****", e)
return {"ok": True}
async def _send_message_upstream(key: str, to_user_name: str, content: str) -> dict:
"""调用上游发送文本消息;成功时写入发出记录并返回响应,失败抛 HTTPException。"""
url = f"{WECHAT_UPSTREAM_BASE_URL.rstrip('/')}{SEND_MSG_PATH}"
payload = {"MsgItem": [{"ToUserName": to_user_name, "MsgType": 1, "TextContent": content}]}
async with httpx.AsyncClient(trust_env=False, timeout=15.0) as client:
resp = await client.post(url, params={"key": key}, json=payload)
if resp.status_code >= 400:
body_preview = resp.text[:400] if resp.text else ""
logger.warning("Send message upstream %s: %s", resp.status_code, body_preview)
raise HTTPException(
status_code=502,
detail=f"upstream_returned_{resp.status_code}: {body_preview}",
)
store.append_sent_message(key, to_user_name, content)
try:
return resp.json()
except Exception:
return {"ok": True, "raw": resp.text[:500]}
async def _send_batch_upstream(key: str, items: List[dict]) -> dict:
"""批量发送:一次请求多个 MsgItem快速分发。"""
url = f"{WECHAT_UPSTREAM_BASE_URL.rstrip('/')}{SEND_MSG_PATH}"
msg_items = []
for it in items:
to_user = (it.get("to_user_name") or it.get("ToUserName") or "").strip()
content = (it.get("content") or it.get("TextContent") or "").strip()
if not to_user:
continue
msg_items.append({"ToUserName": to_user, "MsgType": 1, "TextContent": content})
if not msg_items:
raise HTTPException(status_code=400, detail="items 中至少需要一条有效 to_user_name 与 content")
payload = {"MsgItem": msg_items}
async with httpx.AsyncClient(trust_env=False, timeout=30.0) as client:
resp = await client.post(url, params={"key": key}, json=payload)
if resp.status_code >= 400:
body_preview = resp.text[:400] if resp.text else ""
logger.warning("Batch send upstream %s: %s", resp.status_code, body_preview)
raise HTTPException(
status_code=502,
detail=f"upstream_returned_{resp.status_code}: {body_preview}",
)
for it in msg_items:
store.append_sent_message(key, it["ToUserName"], it.get("TextContent", ""))
try:
return resp.json()
except Exception:
return {"ok": True, "sent": len(msg_items), "raw": resp.text[:500]}
async def _send_image_upstream(key: str, to_user_name: str, image_content: str,
text_content: Optional[str] = "",
at_wxid_list: Optional[List[str]] = None) -> dict:
"""发送图片消息:走 7006 /message/SendImageMessageMsgItem 含 ImageContent、MsgType=0、TextContent、AtWxIDList。"""
url = f"{SEND_IMAGE_UPSTREAM_BASE_URL.rstrip('/')}{SEND_IMAGE_PATH}"
item = {
"AtWxIDList": list(at_wxid_list) if at_wxid_list else [],
"ImageContent": image_content or "",
"MsgType": IMAGE_MSG_TYPE,
"TextContent": text_content or "",
"ToUserName": to_user_name,
}
payload = {"MsgItem": [item]}
async with httpx.AsyncClient(trust_env=False, timeout=15.0) as client:
resp = await client.post(url, params={"key": key}, json=payload)
if resp.status_code >= 400:
body_preview = resp.text[:400] if resp.text else ""
logger.warning("Send image upstream %s: %s", resp.status_code, body_preview)
raise HTTPException(
status_code=502,
detail=f"upstream_returned_{resp.status_code}: {body_preview}",
)
store.append_sent_message(key, to_user_name, "[图片]" + ((" " + text_content) if text_content else ""))
try:
return resp.json()
except Exception:
return {"ok": True, "raw": resp.text[:500]}
@app.post("/api/send-message")
async def api_send_message(body: SendMessageBody):
try:
return await _send_message_upstream(body.key, body.to_user_name, body.content)
except HTTPException:
raise
except Exception as exc:
logger.exception("Send message upstream error: %s", exc)
raise HTTPException(status_code=502, detail=f"upstream_error: {exc}") from exc
@app.post("/api/send-batch")
async def api_send_batch(body: BatchSendBody):
"""快速群发:一次请求批量发送给多人,支持从好友/客户列表选择后调用。"""
items = [{"to_user_name": it.to_user_name, "content": it.content} for it in body.items]
try:
return await _send_batch_upstream(body.key, items)
except HTTPException:
raise
except Exception as exc:
logger.exception("Batch send error: %s", exc)
raise HTTPException(status_code=502, detail=f"upstream_error: {exc}") from exc
@app.post("/api/send-image")
async def api_send_image(body: SendImageBody):
"""发送图片消息快捷方式,参数对应 MsgItemImageContent、TextContent、ToUserName、AtWxIDList。"""
try:
return await _send_image_upstream(
body.key,
body.to_user_name,
body.image_content,
text_content=body.text_content or "",
at_wxid_list=body.at_wxid_list,
)
except HTTPException:
raise
except Exception as exc:
logger.exception("Send image error: %s", exc)
raise HTTPException(status_code=502, detail=f"upstream_error: {exc}") from exc
def _log_contact_list_response_structure(raw: dict) -> None:
"""首轮无数据时打印上游响应结构,便于排查为何数据未解析到。"""
keys_top = list(raw.keys()) if isinstance(raw, dict) else []
data = raw.get("Data") or raw.get("data")
keys_data = list(data.keys()) if isinstance(data, dict) else (type(data).__name__ if data is not None else "None")
logger.info(
"GetContactList response structure (no items extracted): top_level_keys=%s, Data_keys=%s",
keys_top,
keys_data,
)
if isinstance(data, dict):
for k, v in list(data.items())[:5]:
preview = str(v)[:80] if v is not None else "null"
logger.info(" Data.%s: %s", k, preview)
# 7006 常见为 Data.ContactList 对象,内挂 contactUsernameList 数组
cl = data.get("ContactList") or data.get("contactList")
if isinstance(cl, dict):
cl_keys = list(cl.keys())
logger.info(" Data.ContactList keys: %s", cl_keys)
for uk in ("contactUsernameList", "ContactUsernameList", "UserNameList", "userNameList", "usernameList"):
arr = cl.get(uk)
if isinstance(arr, list):
logger.info(" Data.ContactList.%s length=%s, sample=%s", uk, len(arr), arr[:3] if arr else [])
break
def _unwrap_wechat_field(v: Any) -> Any:
"""上游字段有时为 {'str': 'xxx'} 或 {'len': 0} 这种包装,这里尝试取出内部值。"""
if isinstance(v, dict):
if "str" in v:
return v.get("str")
if "len" in v and len(v) == 1:
return v.get("len")
return v
def _normalize_callback_message(raw: dict) -> dict:
"""
将 7006 回调的 message 结构统一为与 WS GetSyncMsg 类似的消息字典,
便于复用 _on_ws_message / 实时消息面板的展示与 AI 接管逻辑。
示例 raw:
{
"key": "HBpEnbtj9BJZ",
"message": {
"msg_id": 126545176,
"from_user_name": {"str": "zhang499142409"},
"to_user_name": {"str": "wxid_xxx"},
"msg_type": 1,
"content": {"str": "测试"},
...
},
"type": "message"
}
"""
msg = raw.get("message") or raw
if not isinstance(msg, dict):
return {}
from_user = _unwrap_wechat_field(msg.get("from_user_name") or msg.get("FromUserName"))
to_user = _unwrap_wechat_field(msg.get("to_user_name") or msg.get("ToUserName"))
content = _unwrap_wechat_field(msg.get("content") or msg.get("Content"))
msg_type = msg.get("msg_type") or msg.get("MsgType")
create_time = msg.get("create_time") or msg.get("CreateTime")
# 回放到统一结构,字段名尽量与 WS GetSyncMsg 一致
normalized = {
"MsgId": msg.get("msg_id") or msg.get("MsgId") or msg.get("new_msg_id"),
"FromUserName": from_user or "",
"ToUserName": to_user or "",
"Content": content or "",
"MsgType": msg_type,
"CreateTime": create_time,
}
# 附带原始字段,便于调试 / 扩展展示
for k, v in msg.items():
if k in (
"msg_id",
"MsgId",
"new_msg_id",
"from_user_name",
"FromUserName",
"to_user_name",
"ToUserName",
"content",
"Content",
"msg_type",
"MsgType",
"create_time",
"CreateTime",
):
continue
normalized[k] = v
return normalized
def _normalize_contact_list(raw: Any) -> List[dict]:
"""将上游 GetContactList 多种返回格式统一为 [ { wxid, remark_name, ... } ]。"""
items: Any = []
if isinstance(raw, list):
items = raw
elif isinstance(raw, dict):
data = raw.get("Data") or raw.get("data") or raw
if isinstance(data, list):
items = data
elif isinstance(data, dict):
contact_list = (
data.get("ContactList")
or data.get("contactList")
or data.get("WxcontactList")
or data.get("wxcontactList")
or data.get("CachedContactList")
)
# 7006 格式ContactList 为对象,联系人 id 在 contactUsernameList 等数组里
if isinstance(contact_list, dict):
username_list = (
contact_list.get("contactUsernameList")
or contact_list.get("ContactUsernameList")
or contact_list.get("UserNameList")
or contact_list.get("userNameList")
or contact_list.get("usernameList")
or []
)
if isinstance(username_list, list) and username_list:
items = [{"wxid": (x if isinstance(x, str) else str(x)), "remark_name": (x if isinstance(x, str) else str(x))} for x in username_list]
else:
items = []
else:
items = contact_list if isinstance(contact_list, list) else (
data.get("List") or data.get("list") or data.get("items")
or data.get("Friends") or data.get("friends")
or data.get("MemberList") or data.get("memberList") or []
)
items = items or raw.get("items") or raw.get("list") or raw.get("List") or raw.get("ContactList") or raw.get("WxcontactList") or []
result = []
for x in items:
if isinstance(x, str):
result.append({"wxid": x, "remark_name": x})
continue
if not isinstance(x, dict):
continue
wxid = _unwrap_wechat_field(
x.get("wxid")
or x.get("Wxid")
or x.get("UserName")
or x.get("userName")
or x.get("Alias")
) or ""
remark = _unwrap_wechat_field(
x.get("remark_name")
or x.get("RemarkName")
or x.get("NickName")
or x.get("nickName")
or x.get("DisplayName")
) or wxid
result.append({"wxid": str(wxid).strip(), "remark_name": str(remark).strip()})
return result
async def _fetch_all_contact_usernames(key: str) -> List[str]:
"""
调用 /friend/GetContactList 拉取全部联系人,返回去重后的 UserName 列表。
仅用于后续调用 GetContactDetailsList 获取详情。
"""
url = f"{CHECK_STATUS_BASE_URL.rstrip('/')}/friend/GetContactList"
usernames: List[str] = []
seen: set = set()
body: dict = {"CurrentChatRoomContactSeq": 0, "CurrentWxcontactSeq": 0}
max_rounds = 50
try:
async with httpx.AsyncClient(trust_env=False, timeout=30.0) as client:
for round_num in range(max_rounds):
resp = await client.post(url, params={"key": key}, json=body)
if resp.status_code >= 400:
logger.warning("GetContactList(round=%s) %s: %s", round_num + 1, resp.status_code, resp.text[:200])
break
raw = resp.json()
chunk = _normalize_contact_list(raw)
if not chunk and isinstance(raw, dict):
chunk = _normalize_contact_list(raw.get("Data") or raw.get("data") or raw)
# 首轮无归一化结果时,直接从 Data.ContactList 下任意已知数组键取 id 列表7006 格式)
if not chunk and round_num == 0 and isinstance(raw, dict):
data = raw.get("Data") or raw.get("data") or {}
if isinstance(data, dict):
cl = data.get("ContactList") or data.get("contactList")
if isinstance(cl, dict):
ul = (
cl.get("contactUsernameList")
or cl.get("ContactUsernameList")
or cl.get("UserNameList")
or cl.get("userNameList")
or cl.get("usernameList")
or []
)
if isinstance(ul, list) and ul:
chunk = [{"wxid": (x if isinstance(x, str) else str(x)), "remark_name": ""} for x in ul]
logger.info("GetContactList fallback from Data.ContactList.* list, count=%s", len(chunk))
if not chunk:
_log_contact_list_response_structure(raw)
for item in chunk or []:
wxid = (item.get("wxid") or "").strip()
if wxid and wxid not in seen:
seen.add(wxid)
usernames.append(wxid)
next_chat, next_wx = _next_contact_seq(raw)
if next_chat == 0 and next_wx == 0:
break
if next_chat == body.get("CurrentChatRoomContactSeq") and next_wx == body.get("CurrentWxcontactSeq"):
break
body = {
"CurrentChatRoomContactSeq": next_chat or body.get("CurrentChatRoomContactSeq", 0),
"CurrentWxcontactSeq": next_wx or body.get("CurrentWxcontactSeq", 0),
}
if not body["CurrentChatRoomContactSeq"] and not body["CurrentWxcontactSeq"]:
break
except Exception as e:
logger.warning("GetContactList usernames error: %s", e)
logger.info("GetContactList usernames total=%s", len(usernames))
return usernames
async def _build_contact_index(key: str, force_refresh: bool = False) -> Dict[str, dict]:
"""
通用联系人索引:
- 先通过 GetContactList 拿到全部 UserName 列表;
- 再通过 /friend/GetContactDetailsList 批量拉取详情;
- 构建 name(微信号/昵称/备注) -> 联系人详情 的索引。
force_refresh=True 时跳过内存缓存,重新请求上游。
"""
if not force_refresh and key in _contact_index and _contact_index[key]:
return _contact_index[key]
usernames = await _fetch_all_contact_usernames(key)
if not usernames:
_contact_index[key] = {}
return _contact_index[key]
url = f"{CHECK_STATUS_BASE_URL.rstrip('/')}/friend/GetContactDetailsList"
index: Dict[str, dict] = {}
# 小批量遍历,多请求并发调用,直到全部返回(不把 contactUsernameList 整包当 UserNames 一次传)
batch_size = 10
max_concurrent = 6
sem = asyncio.Semaphore(max_concurrent)
async def fetch_one_batch(client: httpx.AsyncClient, batch: List[str], batch_idx: int) -> List[dict]:
body = {"RoomWxIDList": [], "UserNames": batch}
async with sem:
try:
resp = await client.post(url, params={"key": key}, json=body)
except Exception as e:
logger.warning("GetContactDetailsList batch %s error: %s", batch_idx, e)
return []
if resp.status_code >= 400:
logger.warning("GetContactDetailsList batch %s %s: %s", batch_idx, resp.status_code, resp.text[:200])
return []
raw = resp.json()
data = raw.get("Data") or raw.get("data") or raw
items = []
if isinstance(data, dict):
items = (
data.get("List")
or data.get("list")
or data.get("ContactDetailsList")
or data.get("contacts")
or data.get("contactList")
or data.get("ContactList") # 7006 可能用大写
or []
)
elif isinstance(data, list):
items = data
if not isinstance(items, list):
return []
if not items and batch_idx == 0:
logger.info("GetContactDetailsList batch 0: data keys=%s, no list parsed", list(data.keys()) if isinstance(data, dict) else type(data).__name__)
if batch_idx == 0 and items:
sample = items[0]
if isinstance(sample, dict):
logger.info(
"GetContactDetailsList first batch item keys=%s",
list(sample.keys()),
)
return items
async with httpx.AsyncClient(trust_env=False, timeout=30.0) as client:
batches = [usernames[i : i + batch_size] for i in range(0, len(usernames), batch_size)]
tasks = [fetch_one_batch(client, b, i) for i, b in enumerate(batches)]
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, one in enumerate(results):
if isinstance(one, BaseException):
logger.warning("GetContactDetailsList batch %s exception: %s", i, one)
continue
for d in one or []:
if not isinstance(d, dict):
continue
# 仅当上游明确返回 bitVal 且不为 3 时跳过(未返回或为 3 则保留,避免漏掉联系人)
try:
bv = d.get("bitVal")
if bv is not None and int(bv) != 3:
continue
except (TypeError, ValueError):
pass
wxid = _unwrap_wechat_field(
d.get("userName") or d.get("UserName") or d.get("user_name") or d.get("wxid")
)
wxid = (wxid or "").strip()
if not wxid:
continue
nick = _unwrap_wechat_field(d.get("nickName") or d.get("NickName") or d.get("nick_name")) or ""
nick = str(nick).strip()
remark = _unwrap_wechat_field(
d.get("remark") or d.get("RemarkName") or d.get("remark_name")
) or ""
remark = str(remark).strip()
pyinitial = _unwrap_wechat_field(d.get("pyinitial") or d.get("pyInitial") or d.get("PYInitial")) or ""
pyinitial = str(pyinitial).strip()
quan_pin = _unwrap_wechat_field(d.get("quanPin") or d.get("QuanPin") or d.get("fullPinyin")) or ""
quan_pin = str(quan_pin).strip()
info = {
"wxid": wxid,
"remark_name": remark or nick or wxid,
"nick_name": nick,
"pyinitial": pyinitial,
"quan_pin": quan_pin,
"raw": d,
}
index[wxid] = info
if nick and nick not in index:
index[nick] = info
if remark and remark not in index:
index[remark] = info
if usernames and not index:
logger.warning(
"Contact index empty for key=***%s despite usernames count=%s: GetContactDetailsList may return different structure or all items filtered",
key[-4:] if len(key) >= 4 else "****",
len(usernames),
)
_contact_index[key] = index
logger.info("Contact index built for key=***%s, size=%s", key[-4:] if len(key) >= 4 else "****", len(index))
return index
async def _resolve_contact_username(key: str, name: str) -> Optional[str]:
"""
将用户提到的“昵称/备注/微信号”解析成真正的 wxid(UserName)。
返回 wxid找不到则返回 None。
"""
if not name:
return None
idx = await _build_contact_index(key)
info = idx.get(name.strip())
if info and isinstance(info, dict):
wxid = (info.get("wxid") or info.get("UserName") or "").strip()
return wxid or None
return None
# 上游 GetContactList传 0 拉首页,响应可能带 NextWxcontactSeq/NextChatRoomContactSeq 表示还有后续页,需循环拉取全量
def _next_contact_seq(raw: dict) -> tuple:
"""从上游响应中解析下一页的 seq返回 (next_chatroom_seq, next_wxcontact_seq)。无下一页则返回 (0, 0)。"""
def _int(v, default: int = 0) -> int:
if v is None:
return default
try:
return int(v)
except (TypeError, ValueError):
return default
data = raw.get("Data") or raw.get("data") or raw
chatroom_seq = 0
wxcontact_seq = 0
if isinstance(data, dict):
chatroom_seq = _int(data.get("NextChatRoomContactSeq") or data.get("CurrentChatRoomContactSeq"), 0)
wxcontact_seq = _int(data.get("NextWxcontactSeq") or data.get("CurrentWxcontactSeq"), 0)
for k in ("NextChatRoomContactSeq", "NextWxcontactSeq"):
v = raw.get(k)
if v is not None:
if "ChatRoom" in k:
chatroom_seq = _int(v, 0)
else:
wxcontact_seq = _int(v, 0)
return (chatroom_seq, wxcontact_seq)
# 联系人列表等接口禁止缓存,避免 304 导致前端拿到旧数据
_NO_CACHE_HEADERS = {"Cache-Control": "no-store, no-cache, must-revalidate", "Pragma": "no-cache"}
@app.get("/api/contact-list")
async def api_contact_list(
key: str = Query(..., description="账号 key"),
refresh: Optional[str] = Query(None, description="传 1/true/yes 时强制重新拉取,不用内存缓存"),
):
"""获取全部联系人详情:基于 GetContactList + GetContactDetailsList 构建的通用索引。禁止缓存。"""
try:
force_refresh = (refresh or "").lower() in ("1", "true", "yes")
index = await _build_contact_index(key, force_refresh=force_refresh)
# 只返回去重后的联系人详情(以 wxid 主键)
uniques: Dict[str, dict] = {}
for name, info in index.items():
if not isinstance(info, dict):
continue
wxid = (info.get("wxid") or "").strip()
if not wxid or wxid in uniques:
continue
uniques[wxid] = {
"wxid": wxid,
# 显示时优先用昵称,其次备注,最后用 wxid
"nick_name": info.get("nick_name") or "",
"remark_name": info.get("remark_name") or info.get("nick_name") or wxid,
"pyinitial": info.get("pyinitial") or "",
"quanPin": info.get("quan_pin") or "",
}
items = list(uniques.values())
logger.info("api_contact_list key=***%s -> %s contacts", key[-4:] if len(key) >= 4 else "****", len(items))
return JSONResponse(content={"items": items}, headers=_NO_CACHE_HEADERS)
except Exception as e:
# 打印完整异常与 key便于排查加载联系人报错
logger.exception("contact-list error for key=***%s: %s", key[-4:] if len(key) >= 4 else "****", e)
return JSONResponse(content={"items": [], "error": str(e)}, headers=_NO_CACHE_HEADERS)
@app.get("/api/friends")
async def api_list_friends(key: str = Query(..., description="账号 key")):
"""好友列表:代理上游联系人接口,与 /api/contact-list 同源;否则返回客户档案。"""
return await api_contact_list(key)
def _friends_fallback(key: str) -> List[dict]:
"""用客户档案作为可选联系人,便于在管理页选择群发对象。"""
customers = store.list_customers(key)
return [
{"wxid": c.get("wxid"), "remark_name": c.get("remark_name") or c.get("wxid"), "id": c.get("id")}
for c in customers
if c.get("wxid")
]
# ---------- AI 接管回复配置(白名单 + 超级管理员) ----------
class AIReplyConfigUpdate(BaseModel):
key: str
super_admin_wxids: Optional[List[str]] = None
whitelist_wxids: Optional[List[str]] = None
@app.get("/api/ai-reply-config")
async def api_get_ai_reply_config(key: str = Query(..., description="账号 key")):
"""获取当前账号的 AI 回复配置:超级管理员与白名单 wxid 列表。"""
cfg = store.get_ai_reply_config(key)
if not cfg:
return {"key": key, "super_admin_wxids": [], "whitelist_wxids": []}
return cfg
@app.patch("/api/ai-reply-config")
async def api_update_ai_reply_config(body: AIReplyConfigUpdate):
"""设置 AI 回复白名单与超级管理员:仅列表内联系人会收到 AI 自动回复。"""
return store.update_ai_reply_config(
body.key,
super_admin_wxids=body.super_admin_wxids,
whitelist_wxids=body.whitelist_wxids,
)
@app.get("/api/ai-reply-status")
async def api_ai_reply_status(key: str = Query(..., description="账号 key")):
"""检查 AI 模型接管是否正常WS 连接、是否配置白名单/超级管理员、是否有当前模型。"""
ws_ok = is_ws_connected()
cfg = store.get_ai_reply_config(key)
super_list = cfg.get("super_admin_wxids") or [] if cfg else []
white_list = cfg.get("whitelist_wxids") or [] if cfg else []
has_allow_list = bool(super_list or white_list)
model = store.get_current_model()
has_model = bool(model and model.get("api_key"))
ok = ws_ok and has_allow_list and has_model
return {
"ok": ok,
"ws_connected": ws_ok,
"has_ai_reply_config": bool(cfg),
"has_whitelist_or_super_admin": has_allow_list,
"super_admin_count": len(super_list),
"whitelist_count": len(white_list),
"has_current_model": has_model,
"message": "正常" if ok else (
"未连接消息同步(WS)" if not ws_ok else
"请在「AI 回复设置」添加并保存超级管理员或白名单" if not has_allow_list else
"请在「模型管理」添加并选中当前模型" if not has_model else "未知"
),
}
# ---------- 模型管理多模型切换API Key 按模型配置) ----------
class ModelCreate(BaseModel):
name: str
provider: str # qwen | openai
api_key: str
base_url: Optional[str] = ""
model_name: Optional[str] = ""
is_current: Optional[bool] = False
class ModelUpdate(BaseModel):
name: Optional[str] = None
api_key: Optional[str] = None
base_url: Optional[str] = None
model_name: Optional[str] = None
def _mask_api_key(m: dict) -> dict:
if not m or not isinstance(m, dict):
return m
out = dict(m)
if out.get("api_key"):
out["api_key"] = "***"
return out
@app.get("/api/models")
async def api_list_models():
return {"items": [_mask_api_key(m) for m in store.list_models()]}
@app.get("/api/models/current")
async def api_get_current_model():
m = store.get_current_model()
if not m:
return {"current": None}
return {"current": _mask_api_key(m)}
@app.post("/api/models")
async def api_create_model(body: ModelCreate):
if body.provider not in ("qwen", "openai", "doubao"):
raise HTTPException(status_code=400, detail="provider must be qwen, openai or doubao")
row = store.create_model(
name=body.name,
provider=body.provider,
api_key=body.api_key,
base_url=body.base_url or "",
model_name=body.model_name or "",
is_current=body.is_current or False,
)
return _mask_api_key(row)
@app.patch("/api/models/{model_id}")
async def api_update_model(model_id: str, body: ModelUpdate):
row = store.update_model(
model_id,
name=body.name,
api_key=body.api_key,
base_url=body.base_url,
model_name=body.model_name,
)
if not row:
raise HTTPException(status_code=404, detail="model not found")
return _mask_api_key(row)
@app.post("/api/models/{model_id}/set-current")
async def api_set_current_model(model_id: str):
row = store.set_current_model(model_id)
if not row:
raise HTTPException(status_code=404, detail="model not found")
return _mask_api_key(row)
@app.delete("/api/models/{model_id}")
async def api_delete_model(model_id: str):
if not store.delete_model(model_id):
raise HTTPException(status_code=404, detail="model not found")
return {"ok": True}
@app.post("/api/qwen/generate")
async def api_qwen_generate(body: QwenGenerateBody):
"""所有对话生成由当前选中的模型接管,不再使用环境变量兜底。"""
messages = []
if body.system:
messages.append({"role": "system", "content": body.system})
messages.append({"role": "user", "content": body.prompt})
text = await llm_chat(messages)
if text is None:
raise HTTPException(status_code=503, detail="请在「模型管理」页添加并选中模型、填写 API Key")
return {"text": text}
@app.post("/api/qwen/generate-greeting")
async def api_qwen_generate_greeting(
remark_name: str = Query(...),
region: str = Query(""),
tags: Optional[str] = Query(None),
):
"""问候语生成由当前选中的模型接管。"""
tag_list = [t.strip() for t in (tags or "").split(",") if t.strip()]
user = f"请生成一句简短的微信问候语1-2句话客户备注名{remark_name}"
if region:
user += f",地区:{region}"
if tag_list:
user += f",标签:{','.join(tag_list)}"
user += "。不要解释,只输出问候语本身。"
text = await llm_chat([{"role": "user", "content": user}])
if text is None:
raise HTTPException(status_code=503, detail="请在「模型管理」页添加并选中模型、填写 API Key")
return {"text": text}
class LogoutBody(BaseModel):
key: str
@app.post("/auth/logout")
async def logout(body: LogoutBody):
key = body.key
if not key:
raise HTTPException(status_code=400, detail="key is required")
url = f"{WECHAT_UPSTREAM_BASE_URL}/login/LogOut"
logger.info("LogOut: key=%s, url=%s", key, url)
try:
async with httpx.AsyncClient(trust_env=False, timeout=15.0) as client:
resp = await client.get(url, params={"key": key})
except Exception as exc:
logger.exception("Error calling upstream LogOut: %s", exc)
raise HTTPException(status_code=502, detail=f"upstream_error: {exc}") from exc
body_text = resp.text[:500]
logger.info(
"Upstream LogOut response: status=%s, body=%s",
resp.status_code,
body_text,
)
return resp.json()
# 静态页面目录:与 Node 一致,直接访问后端时也可访问所有静态页
_PUBLIC_DIR = os.path.join(os.path.dirname(__file__), "..", "public")
if os.path.isdir(_PUBLIC_DIR):
app.mount("/", StaticFiles(directory=_PUBLIC_DIR, html=True), name="static")