# -*- coding: utf-8 -*- """后台 WebSocket 客户端:连接 7006 GetSyncMsg,接收消息并写入 store。""" import asyncio import json import logging import os from typing import Any, Callable, Dict, Optional logger = logging.getLogger("wechat-backend.ws_sync") WS_BASE_URL = os.getenv("WECHAT_WS_BASE_URL", "").rstrip("/") or os.getenv("CHECK_STATUS_BASE_URL", "http://113.44.162.180:7006").rstrip("/").replace("http://", "ws://").replace("https://", "wss://") # 与 7006 GetSyncMsg 建立连接时使用的 key,必须与登录页使用的账号 key 一致,否则收不到该账号的消息 # 优先读取 WECHAT_WS_KEY,未设置时使用 KEY(与登录参数一致) DEFAULT_KEY = (os.getenv("WECHAT_WS_KEY") or os.getenv("KEY") or os.getenv("WS_KEY") or "").strip() or "HBpEnbtj9BJZ" try: import websockets except ImportError: websockets = None # type: ignore _message_callback: Optional[Callable[[str, Dict], Any]] = None _ws_connected: bool = False def set_message_callback(cb: Callable[[str, Dict], Any]) -> None: global _message_callback _message_callback = cb async def _run_ws(key: str) -> None: if not websockets: logger.warning("websockets not installed, skip GetSyncMsg") return url = f"{WS_BASE_URL}/ws/GetSyncMsg?key={key}" logger.info("WS connecting to %s", url) global _ws_connected while True: try: _ws_connected = False async with websockets.connect(url, ping_interval=20, ping_timeout=10, close_timeout=5) as ws: _ws_connected = True logger.info("WS connected for key=%s", key) while True: raw = await ws.recv() try: data = json.loads(raw) if isinstance(raw, str) else json.loads(raw.decode("utf-8")) except Exception: data = {"raw": str(raw)[:500]} if _message_callback: try: _message_callback(key, data) except Exception as e: logger.exception("message_callback error: %s", e) except asyncio.CancelledError: _ws_connected = False raise except Exception as e: _ws_connected = False logger.warning("WS disconnected for key=%s: %s, reconnect in 5s", key, e) await asyncio.sleep(5) def is_ws_connected() -> bool: return _ws_connected def _mask_key(key: str) -> str: """仅显示末尾 4 位,便于核对 key 是否配置正确。""" if not key or len(key) < 5: return "****" return "***" + key[-4:] async def start_ws_sync(key: Optional[str] = None) -> None: k = (key or DEFAULT_KEY).strip() if not k: logger.warning("No KEY for WS GetSyncMsg, skip. 请设置 WECHAT_WS_KEY 或 KEY,且与登录页账号 key 一致") return logger.info("WS GetSyncMsg 使用 key=%s(与登录页 key 一致时才能收到该账号消息)", _mask_key(k)) await _run_ws(k)