import asyncio import html import logging import os from contextlib import asynccontextmanager from datetime import datetime from typing import Any, List, Optional from urllib.parse import urlencode import httpx from fastapi import FastAPI, HTTPException, Query, Request from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import HTMLResponse from pydantic import BaseModel try: from backend import store from backend.llm_client import chat as llm_chat from backend.ws_sync import is_ws_connected, set_message_callback, start_ws_sync except ImportError: import store from llm_client import chat as llm_chat from ws_sync import is_ws_connected, set_message_callback, start_ws_sync WECHAT_UPSTREAM_BASE_URL = os.getenv("WECHAT_UPSTREAM_BASE_URL", "http://localhost:8080").rstrip("/") CHECK_STATUS_BASE_URL = os.getenv("CHECK_STATUS_BASE_URL", "http://113.44.162.180:7006").rstrip("/") SLIDER_VERIFY_BASE_URL = os.getenv("SLIDER_VERIFY_BASE_URL", "http://113.44.162.180:7765").rstrip("/") SLIDER_VERIFY_KEY = os.getenv("SLIDER_VERIFY_KEY", os.getenv("KEY", "408449830")) # 发送文本消息:swagger 中为 POST /message/SendTextMessage,body 为 SendMessageModel(MsgItem 数组) SEND_MSG_PATH = (os.getenv("SEND_MSG_PATH") or "/message/SendTextMessage").strip() # 按 key 缓存取码结果与 Data62,供后续步骤使用 qrcode_store: dict = {} logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s - %(message)s", ) logger = logging.getLogger("wechat-backend") def _on_ws_message(key: str, data: dict) -> None: """GetSyncMsg 收到数据时:写入 store,便于前端拉取与自动回复逻辑使用。""" msg_list = data.get("MsgList") or data.get("List") or data.get("msgList") if isinstance(msg_list, list) and msg_list: store.append_sync_messages(key, msg_list) elif isinstance(data, list): store.append_sync_messages(key, data) else: store.append_sync_messages(key, [data]) async def _run_greeting_scheduler() -> None: """定时检查到期问候任务,通过发送消息接口向匹配客户发送,并标记已执行。""" check_interval = 30 while True: try: await asyncio.sleep(check_interval) now = datetime.now() all_tasks = store.list_greeting_tasks(key=None) for task in all_tasks: if not task.get("enabled"): continue if task.get("executed_at"): continue send_time = task.get("send_time") or task.get("cron") if not send_time: continue dt = _parse_send_time(send_time) if not dt or dt > now: continue task_id = task.get("id") key = task.get("key") customer_tags = set(task.get("customer_tags") or []) template = (task.get("template") or "").strip() or "{{name}},您好!" use_qwen = bool(task.get("use_qwen")) customers = store.list_customers(key) if customer_tags: customers = [c for c in customers if set(c.get("tags") or []) & customer_tags] for c in customers: wxid = c.get("wxid") if not wxid: continue remark_name = (c.get("remark_name") or "").strip() or wxid if use_qwen: user = f"请生成一句简短的微信问候语(1-2句话),客户备注名:{remark_name}" region = (c.get("region") or "").strip() if region: user += f",地区:{region}" tags = c.get("tags") or [] if tags: user += f",标签:{','.join(tags)}" user += "。不要解释,只输出问候语本身。" try: content = await llm_chat([{"role": "user", "content": user}]) except Exception as e: logger.warning("Greeting task %s llm_chat error: %s", task_id, e) content = template.replace("{{name}}", remark_name) if not content or not content.strip(): content = template.replace("{{name}}", remark_name) else: content = template.replace("{{name}}", remark_name) try: await _send_message_upstream(key, wxid, content) logger.info("Greeting task %s sent to %s", task_id, wxid) except Exception as e: logger.warning("Greeting task %s send to %s failed: %s", task_id, wxid, e) store.update_greeting_task(task_id, executed_at=now.isoformat(), enabled=False) logger.info("Greeting task %s executed_at set", task_id) except asyncio.CancelledError: break except Exception as e: logger.exception("Greeting scheduler error: %s", e) @asynccontextmanager async def lifespan(app: FastAPI): set_message_callback(_on_ws_message) asyncio.create_task(start_ws_sync()) scheduler = asyncio.create_task(_run_greeting_scheduler()) yield scheduler.cancel() try: await scheduler except asyncio.CancelledError: pass app = FastAPI(title="WeChat Admin Backend (FastAPI)", lifespan=lifespan) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) class QrCodeRequest(BaseModel): key: str Proxy: Optional[str] = "" IpadOrmac: Optional[str] = "" Check: Optional[bool] = False @app.middleware("http") async def log_requests(request: Request, call_next): logger.info("HTTP %s %s from %s", request.method, request.url.path, request.client.host if request.client else "-") response = await call_next(request) logger.info("HTTP %s %s -> %s", request.method, request.url.path, response.status_code) return response @app.get("/health") async def health() -> dict: logger.info("Health check") return {"status": "ok", "backend": "fastapi", "upstream": WECHAT_UPSTREAM_BASE_URL} @app.get("/api/ws-status") async def api_ws_status() -> dict: """WS(GetSyncMsg)连接状态,供前端在掉线时跳转登录页。""" return {"connected": is_ws_connected()} @app.post("/auth/qrcode") async def get_login_qrcode(body: QrCodeRequest): key = body.key if not key: raise HTTPException(status_code=400, detail="key is required") payload = body.dict(exclude={"key"}) url = f"{WECHAT_UPSTREAM_BASE_URL}/login/GetLoginQrCodeNewDirect" logger.info("GetLoginQrCodeNewDirect: key=%s, payload=%s, url=%s", key, payload, url) try: async with httpx.AsyncClient(timeout=20.0) as client: resp = await client.post(url, params={"key": key}, json=payload) except Exception as exc: logger.exception("Error calling upstream GetLoginQrCodeNewDirect: %s", exc) raise HTTPException( status_code=502, detail={"error": "upstream_connect_error", "detail": str(exc)}, ) from exc body_text = resp.text[:500] if resp.status_code >= 400: logger.warning( "Upstream GetLoginQrCodeNewDirect bad response: status=%s, body=%s", resp.status_code, body_text, ) raise HTTPException( status_code=502, detail={ "error": "upstream_bad_response", "status_code": resp.status_code, "body": body_text, }, ) logger.info( "Upstream GetLoginQrCodeNewDirect success: status=%s, body=%s", resp.status_code, body_text, ) data = resp.json() # 第一步:记录完整返回并保存 Data62,供第二步滑块自动填充参数 try: data62 = data.get("Data62") or (data.get("Data") or {}).get("data62") or "" qrcode_store[key] = {"data62": data62, "response": data} # 在返回中拼接已存储标记,便于后续步骤使用同一 key 取 data62 data["_data62_stored"] = True data["_data62_length"] = len(data62) logger.info("Stored Data62 for key=%s (len=%s)", key, len(data62)) except Exception as e: logger.warning("Store qrcode data for key=%s failed: %s", key, e) return data @app.get("/auth/status") async def get_online_status( key: str = Query(..., description="账号唯一标识"), ): if not key: raise HTTPException(status_code=400, detail="key is required") url = f"{WECHAT_UPSTREAM_BASE_URL}/login/GetLoginStatus" logger.info("GetLoginStatus: key=%s, url=%s", key, url) try: async with httpx.AsyncClient(timeout=15.0) as client: resp = await client.get(url, params={"key": key}) except Exception as exc: logger.exception("Error calling upstream GetLoginStatus: %s", exc) raise HTTPException(status_code=502, detail=f"upstream_error: {exc}") from exc body_text = resp.text[:500] logger.info( "Upstream GetLoginStatus response: status=%s, body=%s", resp.status_code, body_text, ) return resp.json() def _extract_clean_ticket(obj: dict) -> Optional[str]: """从扫码状态返回中提取 ticket,去掉乱码(只保留可见 ASCII 到第一个非法字符前)。""" if not obj or not isinstance(obj, dict): return None d = obj.get("Data") if isinstance(obj.get("Data"), dict) else obj raw = ( (d.get("ticket") if d else None) or obj.get("ticket") or obj.get("Ticket") ) if not raw: wvu = obj.get("wechat_verify_url") or "" if isinstance(wvu, str) and "ticket=" in wvu: raw = wvu.split("ticket=", 1)[1].split("&")[0] if not raw or not isinstance(raw, str): return None clean = [] for ch in raw: code = ord(ch) if code == 0xFFFD or code < 32 or code > 126: break clean.append(ch) return "".join(clean) if clean else None @app.get("/auth/scan-status") async def check_scan_status( key: str = Query(..., description="账号唯一标识"), ): if not key: raise HTTPException(status_code=400, detail="key is required") url = f"{CHECK_STATUS_BASE_URL}/login/CheckLoginStatus" logger.info("CheckLoginStatus: key=%s, url=%s", key, url) try: async with httpx.AsyncClient(timeout=15.0) as client: resp = await client.get(url, params={"key": key}) except Exception as exc: logger.exception("Error calling upstream CheckLoginStatus: %s", exc) raise HTTPException(status_code=502, detail=f"upstream_error: {exc}") from exc body_full = resp.text logger.info( "Upstream CheckLoginStatus response: status=%s, body=%s", resp.status_code, body_full[:2000] if len(body_full) > 2000 else body_full, ) data = resp.json() ticket = _extract_clean_ticket(data) if ticket: # 不调用滑块服务;返回自带预填表单的页面 path,iframe 加载后自动填充 Key/Data62/Original Ticket,用户点「开始验证」提交到第三方 7765 stored = qrcode_store.get(key) or {} data62 = stored.get("data62") or "" params = {"key": SLIDER_VERIFY_KEY, "ticket": ticket} if data62: params["data62"] = data62 data["slider_url"] = f"/auth/slider-form?{urlencode(params)}" logger.info( "Attached slider_url (slider-form) for key=%s (ticket len=%s, data62 len=%s)", key, len(ticket), len(data62), ) return data def _slider_form_html(key_val: str, data62_val: str, ticket_val: str) -> str: """生成滑块表单页:Key、Data62、Original Ticket 已预填,提交到第三方 7765。""" k = html.escape(key_val, quote=True) d = html.escape(data62_val, quote=True) t = html.escape(ticket_val, quote=True) action = html.escape(SLIDER_VERIFY_BASE_URL, quote=True) return f""" 滑块验证

滑块验证

参数已自动填充,点击「开始验证」将提交到第三方滑块服务。

""" @app.get("/auth/slider-form", response_class=HTMLResponse) async def slider_form( key: str = Query(..., description="Key(提交到第三方滑块)"), data62: str = Query("", description="Data62"), ticket: str = Query(..., description="Original Ticket"), ): """返回带 Key/Data62/Original Ticket 预填的表单页,提交到第三方 7765,供 iframe 加载并自动填充。""" return HTMLResponse(content=_slider_form_html(key, data62, ticket)) # ---------- R1-2 客户画像 / R1-3 定时问候 / R1-4 分群推送 / 消息与发送 ---------- class CustomerCreate(BaseModel): key: str wxid: str remark_name: Optional[str] = "" region: Optional[str] = "" age: Optional[str] = "" gender: Optional[str] = "" level: Optional[str] = "" # 拿货等级 tags: Optional[List[str]] = None class GreetingTaskCreate(BaseModel): key: str name: str send_time: str # ISO 格式触发时间,如 2026-03-11T14:30:00,必须为未来时间 customer_tags: Optional[List[str]] = None template: str use_qwen: Optional[bool] = False class ProductTagCreate(BaseModel): key: str name: str class PushGroupCreate(BaseModel): key: str name: str customer_ids: Optional[List[str]] = None tag_ids: Optional[List[str]] = None class PushTaskCreate(BaseModel): key: str product_tag_id: str group_id: str content: str send_at: Optional[str] = None class SendMessageBody(BaseModel): key: str to_user_name: str content: str class QwenGenerateBody(BaseModel): prompt: str system: Optional[str] = None @app.get("/api/customers") async def api_list_customers(key: str = Query(..., description="账号 key")): return {"items": store.list_customers(key)} @app.post("/api/customers") async def api_upsert_customer(body: CustomerCreate): row = store.upsert_customer( body.key, body.wxid, remark_name=body.remark_name or "", region=body.region or "", age=body.age or "", gender=body.gender or "", level=body.level or "", tags=body.tags, ) return row @app.get("/api/customers/{customer_id}") async def api_get_customer(customer_id: str): row = store.get_customer(customer_id) if not row: raise HTTPException(status_code=404, detail="customer not found") return row @app.get("/api/customer-tags") async def api_list_customer_tags(key: str = Query(..., description="账号 key")): """返回该 key 下客户档案中出现的所有标签,供定时任务等下拉选择。""" return {"tags": store.list_customer_tags(key)} @app.delete("/api/customers/{customer_id}") async def api_delete_customer(customer_id: str): if not store.delete_customer(customer_id): raise HTTPException(status_code=404, detail="customer not found") return {"ok": True} @app.get("/api/greeting-tasks") async def api_list_greeting_tasks(key: str = Query(..., description="账号 key")): return {"items": store.list_greeting_tasks(key)} def _parse_send_time(s: str) -> Optional[datetime]: """解析 ISO 时间字符串,返回 datetime(无时区)。""" try: if "T" in s: return datetime.fromisoformat(s.replace("Z", "+00:00")[:19]) return datetime.strptime(s[:19], "%Y-%m-%d %H:%M:%S") except Exception: return None @app.post("/api/greeting-tasks") async def api_create_greeting_task(body: GreetingTaskCreate): dt = _parse_send_time(body.send_time) if not dt: raise HTTPException(status_code=400, detail="触发时间格式无效,请使用 日期+时分秒 选择器") if dt <= datetime.now(): raise HTTPException(status_code=400, detail="触发时间必须是未来时间,请重新选择") row = store.create_greeting_task( body.key, body.name, body.send_time, customer_tags=body.customer_tags or [], template=body.template, use_qwen=body.use_qwen or False, ) return row @app.patch("/api/greeting-tasks/{task_id}") async def api_update_greeting_task(task_id: str, body: dict): if "send_time" in body: dt = _parse_send_time(body["send_time"]) if not dt: raise HTTPException(status_code=400, detail="触发时间格式无效") if dt <= datetime.now(): raise HTTPException(status_code=400, detail="触发时间必须是未来时间") row = store.update_greeting_task(task_id, **{k: v for k, v in body.items() if k in ("name", "send_time", "customer_tags", "template", "use_qwen", "enabled")}) if not row: raise HTTPException(status_code=404, detail="task not found") return row @app.delete("/api/greeting-tasks/{task_id}") async def api_delete_greeting_task(task_id: str): if not store.delete_greeting_task(task_id): raise HTTPException(status_code=404, detail="task not found") return {"ok": True} @app.get("/api/product-tags") async def api_list_product_tags(key: str = Query(..., description="账号 key")): return {"items": store.list_product_tags(key)} @app.post("/api/product-tags") async def api_create_product_tag(body: ProductTagCreate): return store.create_product_tag(body.key, body.name) @app.delete("/api/product-tags/{tag_id}") async def api_delete_product_tag(tag_id: str): if not store.delete_product_tag(tag_id): raise HTTPException(status_code=404, detail="tag not found") return {"ok": True} @app.get("/api/push-groups") async def api_list_push_groups(key: str = Query(..., description="账号 key")): return {"items": store.list_push_groups(key)} @app.post("/api/push-groups") async def api_create_push_group(body: PushGroupCreate): return store.create_push_group(body.key, body.name, body.customer_ids or [], body.tag_ids or []) @app.patch("/api/push-groups/{group_id}") async def api_update_push_group(group_id: str, body: dict): row = store.update_push_group( group_id, name=body.get("name"), customer_ids=body.get("customer_ids"), tag_ids=body.get("tag_ids"), ) if not row: raise HTTPException(status_code=404, detail="group not found") return row @app.delete("/api/push-groups/{group_id}") async def api_delete_push_group(group_id: str): if not store.delete_push_group(group_id): raise HTTPException(status_code=404, detail="group not found") return {"ok": True} @app.get("/api/push-tasks") async def api_list_push_tasks(key: str = Query(..., description="账号 key"), limit: int = Query(100, le=500)): return {"items": store.list_push_tasks(key, limit=limit)} @app.post("/api/push-tasks") async def api_create_push_task(body: PushTaskCreate): return store.create_push_task(body.key, body.product_tag_id, body.group_id, body.content, body.send_at) @app.get("/api/messages") async def api_list_messages(key: str = Query(..., description="账号 key"), limit: int = Query(100, le=500)): return {"items": store.list_sync_messages(key, limit=limit)} async def _send_message_upstream(key: str, to_user_name: str, content: str) -> dict: """调用上游发送文本消息;成功时写入发出记录并返回响应,失败抛 HTTPException。""" url = f"{WECHAT_UPSTREAM_BASE_URL.rstrip('/')}{SEND_MSG_PATH}" payload = {"MsgItem": [{"ToUserName": to_user_name, "MsgType": 1, "TextContent": content}]} async with httpx.AsyncClient(timeout=15.0) as client: resp = await client.post(url, params={"key": key}, json=payload) if resp.status_code >= 400: body_preview = resp.text[:400] if resp.text else "" logger.warning("Send message upstream %s: %s", resp.status_code, body_preview) raise HTTPException( status_code=502, detail=f"upstream_returned_{resp.status_code}: {body_preview}", ) store.append_sent_message(key, to_user_name, content) try: return resp.json() except Exception: return {"ok": True, "raw": resp.text[:500]} @app.post("/api/send-message") async def api_send_message(body: SendMessageBody): try: return await _send_message_upstream(body.key, body.to_user_name, body.content) except HTTPException: raise except Exception as exc: logger.exception("Send message upstream error: %s", exc) raise HTTPException(status_code=502, detail=f"upstream_error: {exc}") from exc # ---------- 模型管理(多模型切换,API Key 按模型配置) ---------- class ModelCreate(BaseModel): name: str provider: str # qwen | openai api_key: str base_url: Optional[str] = "" model_name: Optional[str] = "" is_current: Optional[bool] = False class ModelUpdate(BaseModel): name: Optional[str] = None api_key: Optional[str] = None base_url: Optional[str] = None model_name: Optional[str] = None def _mask_api_key(m: dict) -> dict: if not m or not isinstance(m, dict): return m out = dict(m) if out.get("api_key"): out["api_key"] = "***" return out @app.get("/api/models") async def api_list_models(): return {"items": [_mask_api_key(m) for m in store.list_models()]} @app.get("/api/models/current") async def api_get_current_model(): m = store.get_current_model() if not m: return {"current": None} return {"current": _mask_api_key(m)} @app.post("/api/models") async def api_create_model(body: ModelCreate): if body.provider not in ("qwen", "openai", "doubao"): raise HTTPException(status_code=400, detail="provider must be qwen, openai or doubao") row = store.create_model( name=body.name, provider=body.provider, api_key=body.api_key, base_url=body.base_url or "", model_name=body.model_name or "", is_current=body.is_current or False, ) return _mask_api_key(row) @app.patch("/api/models/{model_id}") async def api_update_model(model_id: str, body: ModelUpdate): row = store.update_model( model_id, name=body.name, api_key=body.api_key, base_url=body.base_url, model_name=body.model_name, ) if not row: raise HTTPException(status_code=404, detail="model not found") return _mask_api_key(row) @app.post("/api/models/{model_id}/set-current") async def api_set_current_model(model_id: str): row = store.set_current_model(model_id) if not row: raise HTTPException(status_code=404, detail="model not found") return _mask_api_key(row) @app.delete("/api/models/{model_id}") async def api_delete_model(model_id: str): if not store.delete_model(model_id): raise HTTPException(status_code=404, detail="model not found") return {"ok": True} @app.post("/api/qwen/generate") async def api_qwen_generate(body: QwenGenerateBody): """所有对话生成由当前选中的模型接管,不再使用环境变量兜底。""" messages = [] if body.system: messages.append({"role": "system", "content": body.system}) messages.append({"role": "user", "content": body.prompt}) text = await llm_chat(messages) if text is None: raise HTTPException(status_code=503, detail="请在「模型管理」页添加并选中模型、填写 API Key") return {"text": text} @app.post("/api/qwen/generate-greeting") async def api_qwen_generate_greeting( remark_name: str = Query(...), region: str = Query(""), tags: Optional[str] = Query(None), ): """问候语生成由当前选中的模型接管。""" tag_list = [t.strip() for t in (tags or "").split(",") if t.strip()] user = f"请生成一句简短的微信问候语(1-2句话),客户备注名:{remark_name}" if region: user += f",地区:{region}" if tag_list: user += f",标签:{','.join(tag_list)}" user += "。不要解释,只输出问候语本身。" text = await llm_chat([{"role": "user", "content": user}]) if text is None: raise HTTPException(status_code=503, detail="请在「模型管理」页添加并选中模型、填写 API Key") return {"text": text} class LogoutBody(BaseModel): key: str @app.post("/auth/logout") async def logout(body: LogoutBody): key = body.key if not key: raise HTTPException(status_code=400, detail="key is required") url = f"{WECHAT_UPSTREAM_BASE_URL}/login/LogOut" logger.info("LogOut: key=%s, url=%s", key, url) try: async with httpx.AsyncClient(timeout=15.0) as client: resp = await client.get(url, params={"key": key}) except Exception as exc: logger.exception("Error calling upstream LogOut: %s", exc) raise HTTPException(status_code=502, detail=f"upstream_error: {exc}") from exc body_text = resp.text[:500] logger.info( "Upstream LogOut response: status=%s, body=%s", resp.status_code, body_text, ) return resp.json()