diff --git a/backend/data/ai_reply_config.json b/backend/data/ai_reply_config.json new file mode 100644 index 0000000..f9f28a4 --- /dev/null +++ b/backend/data/ai_reply_config.json @@ -0,0 +1,11 @@ +[ + { + "key": "HBpEnbtj9BJZ", + "super_admin_wxids": [ + "wxid_f2q8xscgg31322" + ], + "whitelist_wxids": [ + "zhang499142409" + ] + } +] \ No newline at end of file diff --git a/backend/data/models.json b/backend/data/models.json new file mode 100644 index 0000000..67174f4 --- /dev/null +++ b/backend/data/models.json @@ -0,0 +1,11 @@ +[ + { + "id": "dee0443f-36f3-4d7c-9321-618d80c18a89", + "name": "千问", + "provider": "openai", + "api_key": "sk-85880595fc714d63bfd0b025e917bd26", + "base_url": "https://dashscope.aliyuncs.com/compatible-mode/v1", + "model_name": "qwen3.5-plus", + "is_current": true + } +] \ No newline at end of file diff --git a/backend/data/sync_messages.json b/backend/data/sync_messages.json index d85b330..3faaacc 100644 --- a/backend/data/sync_messages.json +++ b/backend/data/sync_messages.json @@ -1174,5 +1174,314 @@ "new_msg_id": 4819003726112313030 }, "type": "message" + }, + { + "direction": "out", + "ToUserName": "zhang499142409", + "Content": "你好吗?", + "CreateTime": 1773162081, + "key": "HBpEnbtj9BJZ" + }, + { + "key": "HBpEnbtj9BJZ", + "message": { + "msg_id": 1610150761, + "from_user_name": { + "str": "zhang499142409" + }, + "to_user_name": { + "str": "wxid_f2q8xscgg31322" + }, + "msg_type": 1, + "content": { + "str": "你是谁" + }, + "status": 3, + "img_status": 1, + "img_buf": { + "len": 0 + }, + "create_time": 1773163138, + "msg_source": "\n\t0\n\t1\n\t1\n\tN0_V1_tvPQ/7y0|v1_SpiyYTgw\n\t\n\t\t\n\t\n\n", + "push_content": "Daniel : 你是谁", + "new_msg_id": 3512349988965098431 + }, + "type": "message" + }, + { + "key": "HBpEnbtj9BJZ", + "message": { + "msg_id": 1826119229, + "from_user_name": { + "str": "wxid_f2q8xscgg31322" + }, + "to_user_name": { + "str": "wxid_f2q8xscgg31322" + }, + "msg_type": 51, + "content": { + "str": "\n\nwxid_f2q8xscgg31322\nlastMessage\n{\"messageSvrId\":\"9191983264673337867\",\"MsgCreateTime\":\"1773161823\"}\n\n" + }, + "status": 3, + "img_status": 1, + "img_buf": { + "len": 0 + }, + "create_time": 1773163203, + "msg_source": "\n\tv1_eSSKf/rE\n\t\n\t\t\n\t\n\n", + "new_msg_id": 3552106780167326835 + }, + "type": "message" + }, + { + "key": "HBpEnbtj9BJZ", + "message": { + "msg_id": 216882921, + "from_user_name": { + "str": "wxid_f2q8xscgg31322" + }, + "to_user_name": { + "str": "zhang499142409" + }, + "msg_type": 51, + "content": { + "str": "\n\nzhang499142409\nlastMessage\n{\"messageSvrId\":\"3512349988965098431\",\"MsgCreateTime\":\"1773163138\"}\n\n" + }, + "status": 3, + "img_status": 1, + "img_buf": { + "len": 0 + }, + "create_time": 1773163205, + "msg_source": "\n\tv1_L2fvIOvF\n\t\n\t\t\n\t\n\n", + "new_msg_id": 8434304876441850640 + }, + "type": "message" + }, + { + "key": "HBpEnbtj9BJZ", + "message": { + "msg_id": 2132745747, + "from_user_name": { + "str": "wxid_f2q8xscgg31322" + }, + "to_user_name": { + "str": "zhang499142409" + }, + "msg_type": 51, + "content": { + "str": "\n\nzhang499142409\nlastMessage\n{\"messageSvrId\":\"3512349988965098431\",\"MsgCreateTime\":\"1773163138\"}\n\n" + }, + "status": 3, + "img_status": 1, + "img_buf": { + "len": 0 + }, + "create_time": 1773163205, + "msg_source": "\n\tv1_DG+ZFs7h\n\t\n\t\t\n\t\n\n", + "new_msg_id": 2869033315579360891 + }, + "type": "message" + }, + { + "key": "HBpEnbtj9BJZ", + "message": { + "msg_id": 2110142296, + "from_user_name": { + "str": "wxid_f2q8xscgg31322" + }, + "to_user_name": { + "str": "zhang499142409" + }, + "msg_type": 51, + "content": { + "str": "\n\nzhang499142409\nlastMessage\n{\"messageSvrId\":\"3512349988965098431\",\"MsgCreateTime\":\"1773163138\"}\n\n" + }, + "status": 3, + "img_status": 1, + "img_buf": { + "len": 0 + }, + "create_time": 1773163291, + "msg_source": "\n\tv1_XmjXzlCu\n\t\n\t\t\n\t\n\n", + "new_msg_id": 6554530052967632446 + }, + "type": "message" + }, + { + "key": "HBpEnbtj9BJZ", + "message": { + "msg_id": 1891079631, + "from_user_name": { + "str": "wxid_f2q8xscgg31322" + }, + "to_user_name": { + "str": "zhang499142409" + }, + "msg_type": 51, + "content": { + "str": "\n\nzhang499142409\nlastMessage\n{\"messageSvrId\":\"3512349988965098431\",\"MsgCreateTime\":\"1773163138\"}\n\n" + }, + "status": 3, + "img_status": 1, + "img_buf": { + "len": 0 + }, + "create_time": 1773163294, + "msg_source": "\n\tv1_iRwWbu7A\n\t\n\t\t\n\t\n\n", + "new_msg_id": 6757624217414248141 + }, + "type": "message" + }, + { + "key": "HBpEnbtj9BJZ", + "message": { + "msg_id": 58087331, + "from_user_name": { + "str": "wxid_f2q8xscgg31322" + }, + "to_user_name": { + "str": "wxid_f2q8xscgg31322" + }, + "msg_type": 51, + "content": { + "str": "\n\nwxid_f2q8xscgg31322\nlastMessage\n{\"messageSvrId\":\"9191983264673337867\",\"MsgCreateTime\":\"1773161823\"}\n\n" + }, + "status": 3, + "img_status": 1, + "img_buf": { + "len": 0 + }, + "create_time": 1773163296, + "msg_source": "\n\tv1_IFm3SM7Y\n\t\n\t\t\n\t\n\n", + "new_msg_id": 1302874624611387202 + }, + "type": "message" + }, + { + "key": "HBpEnbtj9BJZ", + "message": { + "msg_id": 647268517, + "from_user_name": { + "str": "wxid_f2q8xscgg31322" + }, + "to_user_name": { + "str": "wxid_f2q8xscgg31322" + }, + "msg_type": 1, + "content": { + "str": "你用的什么模型" + }, + "status": 3, + "img_status": 1, + "img_buf": { + "len": 0 + }, + "create_time": 1773163308, + "msg_source": "\n\t0\n\t1\n\t1\n\tN0_V1_/m2bkvRf|v1_1FCs6fvq\n\t\n\t\t\n\t\n\n", + "new_msg_id": 8354732942085133458 + }, + "type": "message" + }, + { + "key": "HBpEnbtj9BJZ", + "message": { + "msg_id": 771150200, + "from_user_name": { + "str": "wxid_f2q8xscgg31322" + }, + "to_user_name": { + "str": "wxid_f2q8xscgg31322" + }, + "msg_type": 51, + "content": { + "str": "\n\nwxid_f2q8xscgg31322\nlastMessage\n{\"messageSvrId\":\"8354732942085133458\",\"MsgCreateTime\":\"1773163308\"}\n\n" + }, + "status": 3, + "img_status": 1, + "img_buf": { + "len": 0 + }, + "create_time": 1773163310, + "msg_source": "\n\tv1_2hC615We\n\t\n\t\t\n\t\n\n", + "new_msg_id": 7243733440829071694 + }, + "type": "message" + }, + { + "key": "HBpEnbtj9BJZ", + "message": { + "msg_id": 317539696, + "from_user_name": { + "str": "wxid_f2q8xscgg31322" + }, + "to_user_name": { + "str": "wxid_f2q8xscgg31322" + }, + "msg_type": 51, + "content": { + "str": "\n\nwxid_f2q8xscgg31322\nlastMessage\n{\"messageSvrId\":\"8354732942085133458\",\"MsgCreateTime\":\"1773163308\"}\n\n" + }, + "status": 3, + "img_status": 1, + "img_buf": { + "len": 0 + }, + "create_time": 1773163312, + "msg_source": "\n\tv1_1PuNB2Q3\n\t\n\t\t\n\t\n\n", + "new_msg_id": 7265708667374985818 + }, + "type": "message" + }, + { + "key": "HBpEnbtj9BJZ", + "message": { + "msg_id": 106384113, + "from_user_name": { + "str": "wxid_f2q8xscgg31322" + }, + "to_user_name": { + "str": "wxid_f2q8xscgg31322" + }, + "msg_type": 51, + "content": { + "str": "\n\nwxid_f2q8xscgg31322\nlastMessage\n{\"messageSvrId\":\"8354732942085133458\",\"MsgCreateTime\":\"1773163308\"}\n\n" + }, + "status": 3, + "img_status": 1, + "img_buf": { + "len": 0 + }, + "create_time": 1773163320, + "msg_source": "\n\tv1_rWs/fWf2\n\t\n\t\t\n\t\n\n", + "new_msg_id": 1510294059264702492 + }, + "type": "message" + }, + { + "key": "HBpEnbtj9BJZ", + "message": { + "msg_id": 576932746, + "from_user_name": { + "str": "zhang499142409" + }, + "to_user_name": { + "str": "wxid_f2q8xscgg31322" + }, + "msg_type": 1, + "content": { + "str": "告诉我模型内容" + }, + "status": 3, + "img_status": 1, + "img_buf": { + "len": 0 + }, + "create_time": 1773163339, + "msg_source": "\n\t0\n\t1\n\t1\n\tN0_V1_8uaj8gCr|v1_Flh4iaN8\n\t\n\t\t\n\t\n\n", + "push_content": "Daniel : 告诉我模型内容", + "new_msg_id": 6612157681502055018 + }, + "type": "message" } ] \ No newline at end of file diff --git a/backend/data/wechat.db b/backend/data/wechat.db new file mode 100644 index 0000000..25ba87f Binary files /dev/null and b/backend/data/wechat.db differ diff --git a/backend/db.py b/backend/db.py new file mode 100644 index 0000000..9dfd867 --- /dev/null +++ b/backend/db.py @@ -0,0 +1,179 @@ +# -*- coding: utf-8 -*- +"""SQLite 数据库:表结构初始化与连接,数据目录由 DATA_DIR 决定(可挂载到宿主机)。""" +import json +import os +import sqlite3 + +_DATA_DIR = os.getenv("DATA_DIR") or os.path.join(os.path.dirname(__file__), "data") +_DB_PATH = os.path.join(_DATA_DIR, "wechat.db") + +def get_db_path() -> str: + return _DB_PATH + +def get_conn() -> sqlite3.Connection: + os.makedirs(_DATA_DIR, exist_ok=True) + conn = sqlite3.connect(_DB_PATH, check_same_thread=False) + conn.row_factory = sqlite3.Row + return conn + +def init_schema(conn: sqlite3.Connection) -> None: + cur = conn.cursor() + # 客户档案 + cur.execute(""" + CREATE TABLE IF NOT EXISTS customers ( + id TEXT PRIMARY KEY, + key TEXT NOT NULL, + wxid TEXT NOT NULL, + remark_name TEXT, + region TEXT, + age TEXT, + gender TEXT, + level TEXT, + tags TEXT + ) + """) + cur.execute("CREATE INDEX IF NOT EXISTS idx_customers_key ON customers(key)") + # 定时问候任务 + cur.execute(""" + CREATE TABLE IF NOT EXISTS greeting_tasks ( + id TEXT PRIMARY KEY, + key TEXT NOT NULL, + name TEXT, + send_time TEXT, + customer_tags TEXT, + template TEXT, + use_qwen INTEGER DEFAULT 0, + enabled INTEGER DEFAULT 1, + executed_at TEXT + ) + """) + cur.execute("CREATE INDEX IF NOT EXISTS idx_greeting_tasks_key ON greeting_tasks(key)") + # 商品标签 + cur.execute(""" + CREATE TABLE IF NOT EXISTS product_tags ( + id TEXT PRIMARY KEY, + key TEXT NOT NULL, + name TEXT + ) + """) + # 推送群组 + cur.execute(""" + CREATE TABLE IF NOT EXISTS push_groups ( + id TEXT PRIMARY KEY, + key TEXT NOT NULL, + name TEXT, + customer_ids TEXT, + tag_ids TEXT + ) + """) + # 推送任务 + cur.execute(""" + CREATE TABLE IF NOT EXISTS push_tasks ( + id TEXT PRIMARY KEY, + key TEXT NOT NULL, + product_tag_id TEXT, + group_id TEXT, + content TEXT, + send_at TEXT, + status TEXT, + created_at TEXT + ) + """) + # 同步消息(WS 拉取 + 发出记录) + cur.execute(""" + CREATE TABLE IF NOT EXISTS sync_messages ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + key TEXT NOT NULL, + create_time INTEGER DEFAULT 0, + payload TEXT + ) + """) + cur.execute("CREATE INDEX IF NOT EXISTS idx_sync_messages_key ON sync_messages(key)") + # 模型配置 + cur.execute(""" + CREATE TABLE IF NOT EXISTS models ( + id TEXT PRIMARY KEY, + name TEXT, + provider TEXT, + api_key TEXT, + base_url TEXT, + model_name TEXT, + is_current INTEGER DEFAULT 0 + ) + """) + # AI 回复配置(白名单 / 超级管理员) + cur.execute(""" + CREATE TABLE IF NOT EXISTS ai_reply_config ( + key TEXT PRIMARY KEY, + super_admin_wxids TEXT, + whitelist_wxids TEXT + ) + """) + conn.commit() + _migrate_json_if_needed(conn) + + +def _migrate_json_if_needed(conn: sqlite3.Connection) -> None: + """若表为空且存在同名 JSON 文件,则从 JSON 迁移一次。""" + cur = conn.cursor() + # (table, filename, columns, json_columns) + tables_files = [ + ("customers", "customers.json", ["id", "key", "wxid", "remark_name", "region", "age", "gender", "level", "tags"], ["tags"]), + ("greeting_tasks", "greeting_tasks.json", ["id", "key", "name", "send_time", "customer_tags", "template", "use_qwen", "enabled", "executed_at"], ["customer_tags"]), + ("product_tags", "product_tags.json", ["id", "key", "name"], []), + ("push_groups", "push_groups.json", ["id", "key", "name", "customer_ids", "tag_ids"], ["customer_ids", "tag_ids"]), + ("push_tasks", "push_tasks.json", ["id", "key", "product_tag_id", "group_id", "content", "send_at", "status", "created_at"], []), + ("models", "models.json", ["id", "name", "provider", "api_key", "base_url", "model_name", "is_current"], []), + ("ai_reply_config", "ai_reply_config.json", ["key", "super_admin_wxids", "whitelist_wxids"], ["super_admin_wxids", "whitelist_wxids"]), + ] + for table, filename, columns, json_cols in tables_files: + json_cols_set = set(json_cols) + cur.execute(f"SELECT COUNT(*) FROM {table}") + if cur.fetchone()[0] > 0: + continue + path = os.path.join(_DATA_DIR, filename) + if not os.path.isfile(path): + continue + try: + with open(path, "r", encoding="utf-8") as f: + rows = json.load(f) + except Exception: + continue + if not rows: + continue + for r in rows: + if not isinstance(r, dict): + continue + vals = [] + for c in columns: + v = r.get(c) + if c in json_cols_set and isinstance(v, (list, dict)): + v = json.dumps(v, ensure_ascii=False) + elif isinstance(v, bool): + v = 1 if v else 0 + vals.append(v) + placeholders = ",".join("?" * len(columns)) + cur.execute(f"INSERT OR IGNORE INTO {table} ({','.join(columns)}) VALUES ({placeholders})", vals) + # sync_messages: 按 key + payload 迁移 + cur.execute("SELECT COUNT(*) FROM sync_messages") + if cur.fetchone()[0] == 0: + path = os.path.join(_DATA_DIR, "sync_messages.json") + if os.path.isfile(path): + try: + with open(path, "r", encoding="utf-8") as f: + rows = json.load(f) + for r in rows: + if not isinstance(r, dict): + continue + key = r.get("key", "") + ct = int(r.get("CreateTime") or 0) if isinstance(r.get("CreateTime"), (int, float)) else 0 + cur.execute("INSERT INTO sync_messages (key, create_time, payload) VALUES (?,?,?)", (key, ct, json.dumps(r, ensure_ascii=False))) + except Exception: + pass + conn.commit() + + +def _conn(): + c = get_conn() + init_schema(c) + return c diff --git a/backend/main.py b/backend/main.py index 1f53b3f..964671a 100644 --- a/backend/main.py +++ b/backend/main.py @@ -28,6 +28,13 @@ SLIDER_VERIFY_BASE_URL = os.getenv("SLIDER_VERIFY_BASE_URL", "http://113.44.162. SLIDER_VERIFY_KEY = os.getenv("SLIDER_VERIFY_KEY", os.getenv("KEY", "408449830")) # 发送文本消息:swagger 中为 POST /message/SendTextMessage,body 为 SendMessageModel(MsgItem 数组) SEND_MSG_PATH = (os.getenv("SEND_MSG_PATH") or "/message/SendTextMessage").strip() +# 发送图片消息:部分上游为独立接口,或与文本同 path 仅 MsgType 不同(如 3=图片) +SEND_IMAGE_PATH = (os.getenv("SEND_IMAGE_PATH") or "").strip() or SEND_MSG_PATH +# 联系人列表:7006 为 POST /friend/GetContactList,body 传 CurrentChatRoomContactSeq/CurrentWxcontactSeq=0 +CONTACT_LIST_PATH = (os.getenv("CONTACT_LIST_PATH") or os.getenv("FRIEND_LIST_PATH") or "/friend/GetContactList").strip() +FRIEND_LIST_PATH = (os.getenv("FRIEND_LIST_PATH") or CONTACT_LIST_PATH).strip() +# 图片消息 MsgType:部分上游为 0,常见为 3 +IMAGE_MSG_TYPE = int(os.getenv("IMAGE_MSG_TYPE", "3")) # 按 key 缓存取码结果与 Data62,供后续步骤使用 qrcode_store: dict = {} @@ -39,15 +46,103 @@ logging.basicConfig( logger = logging.getLogger("wechat-backend") +def _is_self_sent(msg: dict) -> bool: + """判断是否为当前账号自己发出的消息(则不由 AI 回复)。""" + if msg.get("direction") == "out": + return True + if msg.get("IsSelf") in (1, True, "1"): + return True + return False + + +def _allowed_ai_reply(key: str, from_user: str) -> bool: + """分级处理:仅超级管理员或白名单内的联系人可获得 AI 回复,其他一律不回复。""" + if not from_user or not from_user.strip(): + return False + cfg = store.get_ai_reply_config(key) + if not cfg: + return False + super_admins = set(cfg.get("super_admin_wxids") or []) + whitelist = set(cfg.get("whitelist_wxids") or []) + return from_user.strip() in super_admins or from_user.strip() in whitelist + + +async def _ai_takeover_reply(key: str, from_user: str, content: str) -> None: + """收到他人消息时由 AI 接管:生成回复并发送。""" + if not from_user or not content or not content.strip(): + return + try: + recent = store.list_sync_messages(key, limit=10) + # 仅取与该用户的最近几条作为上下文(简化:只取最后几条) + context = [] + for m in reversed(recent): + c = (m.get("Content") or m.get("content") or "").strip() + if not c: + continue + if m.get("direction") == "out" and (m.get("ToUserName") or "").strip() == from_user: + context.append({"role": "assistant", "content": c}) + elif (m.get("FromUserName") or m.get("from") or "").strip() == from_user and not _is_self_sent(m): + context.append({"role": "user", "content": c}) + if len(context) >= 6: + break + if not context or context[-1].get("role") != "user": + context.append({"role": "user", "content": content}) + text = await llm_chat(context) + if text and text.strip(): + await _send_message_upstream(key, from_user, text.strip()) + logger.info("AI takeover replied to %s: %s", from_user[:20], text.strip()[:50]) + except Exception as e: + logger.exception("AI takeover reply error (from=%s): %s", from_user, e) + + def _on_ws_message(key: str, data: dict) -> None: - """GetSyncMsg 收到数据时:写入 store,便于前端拉取与自动回复逻辑使用。""" + """GetSyncMsg 收到数据时:写入 store;若为他人消息则 AI 接管对话。""" msg_list = data.get("MsgList") or data.get("List") or data.get("msgList") if isinstance(msg_list, list) and msg_list: store.append_sync_messages(key, msg_list) + for m in msg_list: + if _is_self_sent(m): + continue + from_user = (m.get("FromUserName") or m.get("from") or "").strip() + content = (m.get("Content") or m.get("content") or "").strip() + msg_type = m.get("MsgType") or m.get("msgType") + if from_user and content and (msg_type in (1, None) or str(msg_type) == "1"): # 仅文本触发 AI + if not _allowed_ai_reply(key, from_user): + continue + try: + asyncio.get_running_loop().create_task(_ai_takeover_reply(key, from_user, content)) + except RuntimeError: + pass elif isinstance(data, list): store.append_sync_messages(key, data) + for m in data: + if not isinstance(m, dict) or _is_self_sent(m): + continue + from_user = (m.get("FromUserName") or m.get("from") or "").strip() + content = (m.get("Content") or m.get("content") or "").strip() + msg_type = m.get("MsgType") or m.get("msgType") + if from_user and content and (msg_type in (1, None) or str(msg_type) == "1"): + if not _allowed_ai_reply(key, from_user): + continue + try: + asyncio.get_running_loop().create_task(_ai_takeover_reply(key, from_user, content)) + except RuntimeError: + pass else: store.append_sync_messages(key, [data]) + m = data if isinstance(data, dict) else {} + if not _is_self_sent(m): + from_user = (m.get("FromUserName") or m.get("from") or "").strip() + content = (m.get("Content") or m.get("content") or "").strip() + msg_type = m.get("MsgType") or m.get("msgType") + if from_user and content and (msg_type in (1, None) or str(msg_type) == "1"): + if not _allowed_ai_reply(key, from_user): + pass + else: + try: + asyncio.get_running_loop().create_task(_ai_takeover_reply(key, from_user, content)) + except RuntimeError: + pass async def _run_greeting_scheduler() -> None: @@ -409,6 +504,24 @@ class SendMessageBody(BaseModel): content: str +class BatchSendItem(BaseModel): + to_user_name: str + content: str + + +class BatchSendBody(BaseModel): + key: str + items: List[BatchSendItem] + + +class SendImageBody(BaseModel): + key: str + to_user_name: str + image_content: str # 图片 base64 或 URL,依上游约定 + text_content: Optional[str] = "" + at_wxid_list: Optional[List[str]] = None + + class QwenGenerateBody(BaseModel): prompt: str system: Optional[str] = None @@ -588,6 +701,65 @@ async def _send_message_upstream(key: str, to_user_name: str, content: str) -> d return {"ok": True, "raw": resp.text[:500]} +async def _send_batch_upstream(key: str, items: List[dict]) -> dict: + """批量发送:一次请求多个 MsgItem,快速分发。""" + url = f"{WECHAT_UPSTREAM_BASE_URL.rstrip('/')}{SEND_MSG_PATH}" + msg_items = [] + for it in items: + to_user = (it.get("to_user_name") or it.get("ToUserName") or "").strip() + content = (it.get("content") or it.get("TextContent") or "").strip() + if not to_user: + continue + msg_items.append({"ToUserName": to_user, "MsgType": 1, "TextContent": content}) + if not msg_items: + raise HTTPException(status_code=400, detail="items 中至少需要一条有效 to_user_name 与 content") + payload = {"MsgItem": msg_items} + async with httpx.AsyncClient(timeout=30.0) as client: + resp = await client.post(url, params={"key": key}, json=payload) + if resp.status_code >= 400: + body_preview = resp.text[:400] if resp.text else "" + logger.warning("Batch send upstream %s: %s", resp.status_code, body_preview) + raise HTTPException( + status_code=502, + detail=f"upstream_returned_{resp.status_code}: {body_preview}", + ) + for it in msg_items: + store.append_sent_message(key, it["ToUserName"], it.get("TextContent", "")) + try: + return resp.json() + except Exception: + return {"ok": True, "sent": len(msg_items), "raw": resp.text[:500]} + + +async def _send_image_upstream(key: str, to_user_name: str, image_content: str, + text_content: Optional[str] = "", + at_wxid_list: Optional[List[str]] = None) -> dict: + """发送图片消息:MsgItem 含 ImageContent、MsgType=3(或 0,依上游),可选 TextContent、AtWxIDList。""" + url = f"{WECHAT_UPSTREAM_BASE_URL.rstrip('/')}{SEND_IMAGE_PATH}" + item = { + "ToUserName": to_user_name, + "MsgType": IMAGE_MSG_TYPE, + "ImageContent": image_content or "", + "TextContent": text_content or "", + "AtWxIDList": at_wxid_list or [], + } + payload = {"MsgItem": [item]} + async with httpx.AsyncClient(timeout=15.0) as client: + resp = await client.post(url, params={"key": key}, json=payload) + if resp.status_code >= 400: + body_preview = resp.text[:400] if resp.text else "" + logger.warning("Send image upstream %s: %s", resp.status_code, body_preview) + raise HTTPException( + status_code=502, + detail=f"upstream_returned_{resp.status_code}: {body_preview}", + ) + store.append_sent_message(key, to_user_name, "[图片]" + ((" " + text_content) if text_content else "")) + try: + return resp.json() + except Exception: + return {"ok": True, "raw": resp.text[:500]} + + @app.post("/api/send-message") async def api_send_message(body: SendMessageBody): try: @@ -599,6 +771,161 @@ async def api_send_message(body: SendMessageBody): raise HTTPException(status_code=502, detail=f"upstream_error: {exc}") from exc +@app.post("/api/send-batch") +async def api_send_batch(body: BatchSendBody): + """快速群发:一次请求批量发送给多人,支持从好友/客户列表选择后调用。""" + items = [{"to_user_name": it.to_user_name, "content": it.content} for it in body.items] + try: + return await _send_batch_upstream(body.key, items) + except HTTPException: + raise + except Exception as exc: + logger.exception("Batch send error: %s", exc) + raise HTTPException(status_code=502, detail=f"upstream_error: {exc}") from exc + + +@app.post("/api/send-image") +async def api_send_image(body: SendImageBody): + """发送图片消息快捷方式,参数对应 MsgItem:ImageContent、TextContent、ToUserName、AtWxIDList。""" + try: + return await _send_image_upstream( + body.key, + body.to_user_name, + body.image_content, + text_content=body.text_content or "", + at_wxid_list=body.at_wxid_list, + ) + except HTTPException: + raise + except Exception as exc: + logger.exception("Send image error: %s", exc) + raise HTTPException(status_code=502, detail=f"upstream_error: {exc}") from exc + + +def _normalize_contact_list(raw: Any) -> List[dict]: + """将上游 GetContactList 多种返回格式统一为 [ { wxid, remark_name, ... } ]。""" + items = [] + if isinstance(raw, list): + items = raw + elif isinstance(raw, dict): + data = raw.get("Data") or raw.get("data") or raw + if isinstance(data, list): + items = data + elif isinstance(data, dict): + items = ( + data.get("ContactList") + or data.get("contactList") + or data.get("WxcontactList") + or data.get("wxcontactList") + or data.get("CachedContactList") + or data.get("List") + or data.get("list") + or data.get("items") + or [] + ) + items = items or raw.get("items") or raw.get("list") or raw.get("List") or [] + result = [] + for x in items: + if not isinstance(x, dict): + continue + wxid = ( + x.get("wxid") + or x.get("Wxid") + or x.get("UserName") + or x.get("userName") + or x.get("Alias") + or "" + ) + remark = ( + x.get("remark_name") + or x.get("RemarkName") + or x.get("NickName") + or x.get("nickName") + or x.get("DisplayName") + or wxid + ) + result.append({"wxid": wxid, "remark_name": remark, **{k: v for k, v in x.items() if k not in ("wxid", "Wxid", "remark_name", "RemarkName")}}) + return result + + +# 上游 GetContactList 请求体:CurrentChatRoomContactSeq、CurrentWxcontactSeq 传 0 表示拉取全量 +GET_CONTACT_LIST_BODY = {"CurrentChatRoomContactSeq": 0, "CurrentWxcontactSeq": 0} + + +@app.get("/api/contact-list") +async def api_contact_list(key: str = Query(..., description="账号 key")): + """获取全部联系人:POST 上游,body 为 CurrentChatRoomContactSeq/CurrentWxcontactSeq=0,key 走 query。""" + base = WECHAT_UPSTREAM_BASE_URL.rstrip("/") + path = CONTACT_LIST_PATH if CONTACT_LIST_PATH.startswith("/") else f"/{CONTACT_LIST_PATH}" + url = f"{base}{path}" + try: + async with httpx.AsyncClient(timeout=15.0) as client: + resp = await client.post( + url, + params={"key": key}, + json=GET_CONTACT_LIST_BODY, + ) + if resp.status_code >= 400: + logger.warning("GetContactList %s: %s", resp.status_code, resp.text[:200]) + return {"items": [], "error": resp.text[:200]} + raw = resp.json() + # 日志便于确认 7006 返回结构(不打印完整列表) + if isinstance(raw, dict): + data = raw.get("Data") or raw.get("data") + data_keys = list(data.keys()) if isinstance(data, dict) else getattr(data, "__name__", type(data).__name__) + logger.info("GetContactList response keys: raw=%s, Data=%s", list(raw.keys()), data_keys) + items = _normalize_contact_list(raw) + if not items and isinstance(raw, dict): + items = _normalize_contact_list(raw.get("Data") or raw.get("data") or raw) + logger.info("GetContactList normalized items count: %s", len(items)) + return {"items": items} + except Exception as e: + logger.warning("GetContactList error: %s", e) + return {"items": [], "error": str(e)} + + +@app.get("/api/friends") +async def api_list_friends(key: str = Query(..., description="账号 key")): + """好友列表:代理上游联系人接口,与 /api/contact-list 同源;否则返回客户档案。""" + return await api_contact_list(key) + + +def _friends_fallback(key: str) -> List[dict]: + """用客户档案作为可选联系人,便于在管理页选择群发对象。""" + customers = store.list_customers(key) + return [ + {"wxid": c.get("wxid"), "remark_name": c.get("remark_name") or c.get("wxid"), "id": c.get("id")} + for c in customers + if c.get("wxid") + ] + + +# ---------- AI 接管回复配置(白名单 + 超级管理员) ---------- +class AIReplyConfigUpdate(BaseModel): + key: str + super_admin_wxids: Optional[List[str]] = None + whitelist_wxids: Optional[List[str]] = None + + +@app.get("/api/ai-reply-config") +async def api_get_ai_reply_config(key: str = Query(..., description="账号 key")): + """获取当前账号的 AI 回复配置:超级管理员与白名单 wxid 列表。""" + cfg = store.get_ai_reply_config(key) + if not cfg: + return {"key": key, "super_admin_wxids": [], "whitelist_wxids": []} + return cfg + + +@app.patch("/api/ai-reply-config") +async def api_update_ai_reply_config(body: AIReplyConfigUpdate): + """设置 AI 回复白名单与超级管理员:仅列表内联系人会收到 AI 自动回复。""" + return store.update_ai_reply_config( + body.key, + super_admin_wxids=body.super_admin_wxids, + whitelist_wxids=body.whitelist_wxids, + ) + + # ---------- 模型管理(多模型切换,API Key 按模型配置) ---------- class ModelCreate(BaseModel): name: str diff --git a/backend/store.py b/backend/store.py index 484e326..b003633 100644 --- a/backend/store.py +++ b/backend/store.py @@ -1,305 +1,437 @@ # -*- coding: utf-8 -*- -"""JSON 文件存储:客户档案、定时问候任务、商品标签、推送群组、推送任务、同步消息。""" +"""数据库存储:客户档案、定时问候、商品标签、推送群组/任务、同步消息、模型、AI 回复配置。使用 SQLite,便于增删改查。""" import json -import os import threading +import time import uuid from typing import Any, Dict, List, Optional -_DATA_DIR = os.path.join(os.path.dirname(__file__), "data") +try: + from backend import db +except ImportError: + import db + _LOCK = threading.Lock() -def _path(name: str) -> str: - os.makedirs(_DATA_DIR, exist_ok=True) - return os.path.join(_DATA_DIR, f"{name}.json") +def _conn(): + conn = db.get_conn() + db.init_schema(conn) + return conn + +def _row_to_dict(row) -> dict: + if row is None: + return {} + d = dict(row) + out = {} + for k, v in d.items(): + if k in ("tags", "customer_tags", "customer_ids", "tag_ids", "super_admin_wxids", "whitelist_wxids") and isinstance(v, str): + try: + out[k] = json.loads(v) if v else [] + except Exception: + out[k] = [] + elif k in ("use_qwen", "enabled", "is_current") and v is not None: + out[k] = bool(v) + else: + out[k] = v + return out -def _load(name: str) -> list: - with _LOCK: - p = _path(name) - if not os.path.exists(p): - return [] - try: - with open(p, "r", encoding="utf-8") as f: - return json.load(f) - except Exception: - return [] - - -def _save(name: str, data: list) -> None: - with _LOCK: - p = _path(name) - with open(p, "w", encoding="utf-8") as f: - json.dump(data, f, ensure_ascii=False, indent=2) - - -# ---------- 客户档案 R1-2 ---------- +# ---------- 客户档案 ---------- def list_customers(key: Optional[str] = None) -> List[Dict]: - """key: 微信 key,若传则只返回该 key 下的客户。""" - rows = _load("customers") - if key: - rows = [r for r in rows if r.get("key") == key] - return sorted(rows, key=lambda x: (x.get("remark_name") or x.get("wxid") or "")) - + with _LOCK: + conn = _conn() + try: + if key: + cur = conn.execute("SELECT * FROM customers WHERE key = ? ORDER BY remark_name, wxid", (key,)) + else: + cur = conn.execute("SELECT * FROM customers ORDER BY remark_name, wxid") + return [_row_to_dict(r) for r in cur.fetchall()] + finally: + conn.close() def get_customer(customer_id: str) -> Optional[Dict]: - rows = _load("customers") - for r in rows: - if r.get("id") == customer_id: - return r - return None - + with _LOCK: + conn = _conn() + try: + cur = conn.execute("SELECT * FROM customers WHERE id = ?", (customer_id,)) + row = cur.fetchone() + return _row_to_dict(row) if row else None + finally: + conn.close() def upsert_customer(key: str, wxid: str, remark_name: str = "", region: str = "", age: str = "", gender: str = "", level: str = "", tags: Optional[List[str]] = None, customer_id: Optional[str] = None) -> Dict: - """拿货等级 level;tags 为标签列表,用于分群与问候。""" - rows = _load("customers") - if tags is None: - tags = [] + tags = tags or [] rid = customer_id or str(uuid.uuid4()) - for r in rows: - if r.get("id") == rid or (r.get("key") == key and r.get("wxid") == wxid and not customer_id): - r.update({ - "key": key, "wxid": wxid, "remark_name": remark_name, "region": region, - "age": age, "gender": gender, "level": level, "tags": tags, - }) - _save("customers", rows) - return r - new_row = { - "id": rid, "key": key, "wxid": wxid, "remark_name": remark_name, - "region": region, "age": age, "gender": gender, "level": level, "tags": tags, - } - rows.append(new_row) - _save("customers", rows) - return new_row - + tags_json = json.dumps(tags, ensure_ascii=False) + with _LOCK: + conn = _conn() + try: + if customer_id: + conn.execute( + "UPDATE customers SET key=?, wxid=?, remark_name=?, region=?, age=?, gender=?, level=?, tags=? WHERE id=?", + (key, wxid, remark_name, region, age, gender, level, tags_json, customer_id) + ) + conn.commit() + cur = conn.execute("SELECT * FROM customers WHERE id = ?", (customer_id,)) + return _row_to_dict(cur.fetchone()) + cur = conn.execute("SELECT id FROM customers WHERE key = ? AND wxid = ?", (key, wxid)) + row = cur.fetchone() + if row: + conn.execute( + "UPDATE customers SET remark_name=?, region=?, age=?, gender=?, level=?, tags=? WHERE id=?", + (remark_name, region, age, gender, level, tags_json, row["id"]) + ) + conn.commit() + cur = conn.execute("SELECT * FROM customers WHERE id = ?", (row["id"],)) + return _row_to_dict(cur.fetchone()) + conn.execute( + "INSERT INTO customers (id, key, wxid, remark_name, region, age, gender, level, tags) VALUES (?,?,?,?,?,?,?,?,?)", + (rid, key, wxid, remark_name, region, age, gender, level, tags_json) + ) + conn.commit() + cur = conn.execute("SELECT * FROM customers WHERE id = ?", (rid,)) + return _row_to_dict(cur.fetchone()) + finally: + conn.close() def delete_customer(customer_id: str) -> bool: - rows = _load("customers") - for i, r in enumerate(rows): - if r.get("id") == customer_id: - rows.pop(i) - _save("customers", rows) - return True - return False - + with _LOCK: + conn = _conn() + try: + cur = conn.execute("DELETE FROM customers WHERE id = ?", (customer_id,)) + conn.commit() + return cur.rowcount > 0 + finally: + conn.close() def list_customer_tags(key: str) -> List[str]: - """返回该 key 下客户档案中出现过的所有标签(去重、排序)。""" - rows = [r for r in _load("customers") if r.get("key") == key] + rows = [r for r in list_customers(key=key) if r] tags_set = set() for r in rows: - for t in r.get("tags") or []: + for t in (r.get("tags") or []): if t and str(t).strip(): tags_set.add(str(t).strip()) return sorted(tags_set) -# ---------- 定时问候任务 R1-3 ---------- +# ---------- 定时问候任务 ---------- def list_greeting_tasks(key: Optional[str] = None) -> List[Dict]: - rows = _load("greeting_tasks") - if key: - rows = [r for r in rows if r.get("key") == key] - return sorted(rows, key=lambda x: x.get("send_time", "") or x.get("cron", "")) - + with _LOCK: + conn = _conn() + try: + if key: + cur = conn.execute("SELECT * FROM greeting_tasks WHERE key = ? ORDER BY send_time", (key,)) + else: + cur = conn.execute("SELECT * FROM greeting_tasks ORDER BY send_time") + return [_row_to_dict(r) for r in cur.fetchall()] + finally: + conn.close() def get_greeting_task(task_id: str) -> Optional[Dict]: - for r in _load("greeting_tasks"): - if r.get("id") == task_id: - return r - return None - + with _LOCK: + conn = _conn() + try: + cur = conn.execute("SELECT * FROM greeting_tasks WHERE id = ?", (task_id,)) + row = cur.fetchone() + return _row_to_dict(row) if row else None + finally: + conn.close() def create_greeting_task(key: str, name: str, send_time: str, customer_tags: List[str], - template: str, use_qwen: bool = False) -> Dict: + template: str, use_qwen: bool = False) -> Dict: rid = str(uuid.uuid4()) row = { "id": rid, "key": key, "name": name, "send_time": send_time, "customer_tags": customer_tags or [], "template": template, "use_qwen": use_qwen, "enabled": True, "executed_at": None, } - rows = _load("greeting_tasks") - rows.append(row) - _save("greeting_tasks", rows) - return row - + with _LOCK: + conn = _conn() + try: + conn.execute( + "INSERT INTO greeting_tasks (id, key, name, send_time, customer_tags, template, use_qwen, enabled, executed_at) VALUES (?,?,?,?,?,?,?,?,?)", + (rid, key, name, send_time, json.dumps(customer_tags or [], ensure_ascii=False), template, 1 if use_qwen else 0, 1, None) + ) + conn.commit() + return row + finally: + conn.close() def update_greeting_task(task_id: str, **kwargs) -> Optional[Dict]: - rows = _load("greeting_tasks") - for r in rows: - if r.get("id") == task_id: - for k, v in kwargs.items(): - if k in ("name", "send_time", "cron", "customer_tags", "template", "use_qwen", "enabled", "executed_at"): - r[k] = v - _save("greeting_tasks", rows) - return r - return None - + allowed = ("name", "send_time", "cron", "customer_tags", "template", "use_qwen", "enabled", "executed_at") + with _LOCK: + conn = _conn() + try: + cur = conn.execute("SELECT * FROM greeting_tasks WHERE id = ?", (task_id,)) + row = cur.fetchone() + if not row: + return None + updates = [] + params = [] + if "send_time" in kwargs: + updates.append("send_time = ?") + params.append(kwargs["send_time"]) + if "cron" in kwargs: + updates.append("send_time = ?") + params.append(kwargs["cron"]) + for k in ("name", "customer_tags", "template", "use_qwen", "enabled", "executed_at"): + if k in kwargs: + v = kwargs[k] + if k == "customer_tags": + updates.append("customer_tags = ?") + params.append(json.dumps(v, ensure_ascii=False)) + elif k == "use_qwen": + updates.append("use_qwen = ?") + params.append(1 if v else 0) + elif k == "enabled": + updates.append("enabled = ?") + params.append(1 if v else 0) + else: + updates.append(f"{k} = ?") + params.append(v) + if not updates: + return _row_to_dict(row) + params.append(task_id) + conn.execute(f"UPDATE greeting_tasks SET {', '.join(updates)} WHERE id = ?", params) + conn.commit() + cur = conn.execute("SELECT * FROM greeting_tasks WHERE id = ?", (task_id,)) + return _row_to_dict(cur.fetchone()) + finally: + conn.close() def delete_greeting_task(task_id: str) -> bool: - rows = _load("greeting_tasks") - for i, r in enumerate(rows): - if r.get("id") == task_id: - rows.pop(i) - _save("greeting_tasks", rows) - return True - return False + with _LOCK: + conn = _conn() + try: + cur = conn.execute("DELETE FROM greeting_tasks WHERE id = ?", (task_id,)) + conn.commit() + return cur.rowcount > 0 + finally: + conn.close() -# ---------- 商品标签 R1-4 ---------- +# ---------- 商品标签 ---------- def list_product_tags(key: Optional[str] = None) -> List[Dict]: - rows = _load("product_tags") - if key: - rows = [r for r in rows if r.get("key") == key] - return rows - + with _LOCK: + conn = _conn() + try: + if key: + cur = conn.execute("SELECT * FROM product_tags WHERE key = ?", (key,)) + else: + cur = conn.execute("SELECT * FROM product_tags") + return [_row_to_dict(r) for r in cur.fetchall()] + finally: + conn.close() def create_product_tag(key: str, name: str) -> Dict: rid = str(uuid.uuid4()) - row = {"id": rid, "key": key, "name": name} - rows = _load("product_tags") - rows.append(row) - _save("product_tags", rows) - return row - + with _LOCK: + conn = _conn() + try: + conn.execute("INSERT INTO product_tags (id, key, name) VALUES (?,?,?)", (rid, key, name)) + conn.commit() + cur = conn.execute("SELECT * FROM product_tags WHERE id = ?", (rid,)) + return _row_to_dict(cur.fetchone()) + finally: + conn.close() def delete_product_tag(tag_id: str) -> bool: - rows = _load("product_tags") - for i, r in enumerate(rows): - if r.get("id") == tag_id: - rows.pop(i) - _save("product_tags", rows) - return True - return False + with _LOCK: + conn = _conn() + try: + cur = conn.execute("DELETE FROM product_tags WHERE id = ?", (tag_id,)) + conn.commit() + return cur.rowcount > 0 + finally: + conn.close() -# ---------- 推送群组(客户群组) ---------- +# ---------- 推送群组 ---------- def list_push_groups(key: Optional[str] = None) -> List[Dict]: - rows = _load("push_groups") - if key: - rows = [r for r in rows if r.get("key") == key] - return rows - + with _LOCK: + conn = _conn() + try: + if key: + cur = conn.execute("SELECT * FROM push_groups WHERE key = ?", (key,)) + else: + cur = conn.execute("SELECT * FROM push_groups") + return [_row_to_dict(r) for r in cur.fetchall()] + finally: + conn.close() def create_push_group(key: str, name: str, customer_ids: List[str], tag_ids: List[str]) -> Dict: rid = str(uuid.uuid4()) - row = {"id": rid, "key": key, "name": name, "customer_ids": customer_ids or [], "tag_ids": tag_ids or []} - rows = _load("push_groups") - rows.append(row) - _save("push_groups", rows) - return row - + with _LOCK: + conn = _conn() + try: + conn.execute( + "INSERT INTO push_groups (id, key, name, customer_ids, tag_ids) VALUES (?,?,?,?,?)", + (rid, key, name, json.dumps(customer_ids or [], ensure_ascii=False), json.dumps(tag_ids or [], ensure_ascii=False)) + ) + conn.commit() + cur = conn.execute("SELECT * FROM push_groups WHERE id = ?", (rid,)) + return _row_to_dict(cur.fetchone()) + finally: + conn.close() def update_push_group(group_id: str, name: Optional[str] = None, customer_ids: Optional[List[str]] = None, tag_ids: Optional[List[str]] = None) -> Optional[Dict]: - rows = _load("push_groups") - for r in rows: - if r.get("id") == group_id: + with _LOCK: + conn = _conn() + try: + cur = conn.execute("SELECT * FROM push_groups WHERE id = ?", (group_id,)) + if cur.fetchone() is None: + return None + updates, params = [], [] if name is not None: - r["name"] = name + updates.append("name = ?") + params.append(name) if customer_ids is not None: - r["customer_ids"] = customer_ids + updates.append("customer_ids = ?") + params.append(json.dumps(customer_ids, ensure_ascii=False)) if tag_ids is not None: - r["tag_ids"] = tag_ids - _save("push_groups", rows) - return r - return None - + updates.append("tag_ids = ?") + params.append(json.dumps(tag_ids, ensure_ascii=False)) + if not updates: + cur = conn.execute("SELECT * FROM push_groups WHERE id = ?", (group_id,)) + return _row_to_dict(cur.fetchone()) + params.append(group_id) + conn.execute(f"UPDATE push_groups SET {', '.join(updates)} WHERE id = ?", params) + conn.commit() + cur = conn.execute("SELECT * FROM push_groups WHERE id = ?", (group_id,)) + return _row_to_dict(cur.fetchone()) + finally: + conn.close() def delete_push_group(group_id: str) -> bool: - rows = _load("push_groups") - for i, r in enumerate(rows): - if r.get("id") == group_id: - rows.pop(i) - _save("push_groups", rows) - return True - return False + with _LOCK: + conn = _conn() + try: + cur = conn.execute("DELETE FROM push_groups WHERE id = ?", (group_id,)) + conn.commit() + return cur.rowcount > 0 + finally: + conn.close() -# ---------- 推送任务(一键/定时发送) ---------- +# ---------- 推送任务 ---------- def list_push_tasks(key: Optional[str] = None, limit: int = 200) -> List[Dict]: - rows = _load("push_tasks") - if key: - rows = [r for r in rows if r.get("key") == key] - rows = sorted(rows, key=lambda x: x.get("created_at", ""), reverse=True) - return rows[:limit] - + with _LOCK: + conn = _conn() + try: + if key: + cur = conn.execute("SELECT * FROM push_tasks WHERE key = ? ORDER BY created_at DESC LIMIT ?", (key, limit)) + else: + cur = conn.execute("SELECT * FROM push_tasks ORDER BY created_at DESC LIMIT ?", (limit,)) + return [_row_to_dict(r) for r in cur.fetchall()] + finally: + conn.close() def create_push_task(key: str, product_tag_id: str, group_id: str, content: str, send_at: Optional[str] = None) -> Dict: rid = str(uuid.uuid4()) - import time - row = { - "id": rid, "key": key, "product_tag_id": product_tag_id, "group_id": group_id, - "content": content, "send_at": send_at, "status": "pending", - "created_at": time.strftime("%Y-%m-%dT%H:%M:%S"), - } - rows = _load("push_tasks") - rows.append(row) - _save("push_tasks", rows) - return row - + created = time.strftime("%Y-%m-%dT%H:%M:%S") + with _LOCK: + conn = _conn() + try: + conn.execute( + "INSERT INTO push_tasks (id, key, product_tag_id, group_id, content, send_at, status, created_at) VALUES (?,?,?,?,?,?,?,?)", + (rid, key, product_tag_id, group_id, content, send_at, "pending", created) + ) + conn.commit() + cur = conn.execute("SELECT * FROM push_tasks WHERE id = ?", (rid,)) + return _row_to_dict(cur.fetchone()) + finally: + conn.close() def update_push_task_status(task_id: str, status: str) -> Optional[Dict]: - rows = _load("push_tasks") - for r in rows: - if r.get("id") == task_id: - r["status"] = status - _save("push_tasks", rows) - return r - return None + with _LOCK: + conn = _conn() + try: + conn.execute("UPDATE push_tasks SET status = ? WHERE id = ?", (status, task_id)) + conn.commit() + cur = conn.execute("SELECT * FROM push_tasks WHERE id = ?", (task_id,)) + row = cur.fetchone() + return _row_to_dict(row) if row else None + finally: + conn.close() -# ---------- WS 同步消息(GetSyncMsg 结果) ---------- +# ---------- 同步消息 ---------- def append_sync_messages(key: str, messages: List[Dict], max_per_key: int = 500) -> None: - rows = _load("sync_messages") - for m in messages: - m["key"] = key - rows.append(m) - by_key: Dict[str, List[Dict]] = {} - for m in rows: - k = m.get("key", "") - by_key.setdefault(k, []).append(m) - new_rows = [] - for lst in by_key.values(): - new_rows.extend(lst[-max_per_key:]) - _save("sync_messages", new_rows) - + with _LOCK: + conn = _conn() + try: + for m in messages: + create_time = int(m.get("CreateTime") or 0) if isinstance(m.get("CreateTime"), (int, float)) else 0 + conn.execute("INSERT INTO sync_messages (key, create_time, payload) VALUES (?,?,?)", + (key, create_time, json.dumps(m, ensure_ascii=False))) + conn.commit() + # 每个 key 只保留最近 max_per_key 条 + cur = conn.execute("SELECT id FROM sync_messages WHERE key = ? ORDER BY create_time DESC", (key,)) + rows = cur.fetchall() + if len(rows) > max_per_key: + to_del = [r["id"] for r in rows[max_per_key:]] + placeholders = ",".join("?" * len(to_del)) + conn.execute(f"DELETE FROM sync_messages WHERE id IN ({placeholders})", to_del) + conn.commit() + finally: + conn.close() def list_sync_messages(key: str, limit: int = 100) -> List[Dict]: - rows = _load("sync_messages") - rows = [r for r in rows if r.get("key") == key] - # 统一按 CreateTime 排序(支持 int 时间戳与其它格式),新消息在前 - rows = sorted(rows, key=lambda x: int(x.get("CreateTime") or 0) if isinstance(x.get("CreateTime"), (int, float)) else 0, reverse=True) - return rows[:limit] - + with _LOCK: + conn = _conn() + try: + cur = conn.execute( + "SELECT payload FROM sync_messages WHERE key = ? ORDER BY create_time DESC LIMIT ?", + (key, limit) + ) + rows = cur.fetchall() + out = [] + for r in rows: + try: + out.append(json.loads(r["payload"])) + except Exception: + pass + return out + finally: + conn.close() def append_sent_message(key: str, to_user_name: str, content: str) -> None: - """发送消息成功后写入一条「发出」记录,便于在实时消息页展示完整对话。""" - import time append_sync_messages(key, [{"direction": "out", "ToUserName": to_user_name, "Content": content, "CreateTime": int(time.time())}]) -# ---------- 模型管理(多模型切换,API Key 按模型配置) ---------- +# ---------- 模型 ---------- def list_models() -> List[Dict]: - rows = _load("models") - return sorted(rows, key=lambda x: (not x.get("is_current"), x.get("name") or "")) - + with _LOCK: + conn = _conn() + try: + cur = conn.execute("SELECT * FROM models ORDER BY is_current DESC, name") + return [_row_to_dict(r) for r in cur.fetchall()] + finally: + conn.close() def get_model(model_id: str) -> Optional[Dict]: - for r in _load("models"): - if r.get("id") == model_id: - return r - return None - + with _LOCK: + conn = _conn() + try: + cur = conn.execute("SELECT * FROM models WHERE id = ?", (model_id,)) + row = cur.fetchone() + return _row_to_dict(row) if row else None + finally: + conn.close() def get_current_model() -> Optional[Dict]: - for r in _load("models"): - if r.get("is_current"): - return r - return None - + with _LOCK: + conn = _conn() + try: + cur = conn.execute("SELECT * FROM models WHERE is_current = 1 LIMIT 1") + row = cur.fetchone() + return _row_to_dict(row) if row else None + finally: + conn.close() def create_model( name: str, @@ -309,36 +441,35 @@ def create_model( model_name: str = "", is_current: bool = False, ) -> Dict: - rows = _load("models") - if is_current: - for r in rows: - r["is_current"] = False - rid = str(uuid.uuid4()) - if provider == "qwen": - default_base = "https://dashscope.aliyuncs.com/compatible-mode/v1" - default_model = "qwen-turbo" - elif provider == "doubao": - default_base = "https://ark.cn-beijing.volces.com/api/v3" - default_model = "doubao-seed-2-0-pro-260215" - else: - default_base = "https://api.openai.com/v1" - default_model = "gpt-3.5-turbo" - row = { - "id": rid, - "name": name, - "provider": provider, - "api_key": api_key, - "base_url": (base_url or default_base).strip(), - "model_name": (model_name or default_model).strip(), - "is_current": is_current or len(rows) == 0, - } - if row["is_current"]: - for r in rows: - r["is_current"] = False - rows.append(row) - _save("models", rows) - return row - + with _LOCK: + conn = _conn() + try: + if is_current: + conn.execute("UPDATE models SET is_current = 0") + rid = str(uuid.uuid4()) + default_base = "https://api.openai.com/v1" + default_model = "gpt-3.5-turbo" + if provider == "qwen": + default_base = "https://dashscope.aliyuncs.com/compatible-mode/v1" + default_model = "qwen-turbo" + elif provider == "doubao": + default_base = "https://ark.cn-beijing.volces.com/api/v3" + default_model = "doubao-seed-2-0-pro-260215" + base_url = (base_url or default_base).strip() + model_name = (model_name or default_model).strip() + cur = conn.execute("SELECT COUNT(*) as c FROM models") + is_current = is_current or cur.fetchone()["c"] == 0 + if is_current: + conn.execute("UPDATE models SET is_current = 0") + conn.execute( + "INSERT INTO models (id, name, provider, api_key, base_url, model_name, is_current) VALUES (?,?,?,?,?,?,?)", + (rid, name, provider, api_key, base_url, model_name, 1 if is_current else 0) + ) + conn.commit() + cur = conn.execute("SELECT * FROM models WHERE id = ?", (rid,)) + return _row_to_dict(cur.fetchone()) + finally: + conn.close() def update_model( model_id: str, @@ -347,44 +478,104 @@ def update_model( base_url: Optional[str] = None, model_name: Optional[str] = None, ) -> Optional[Dict]: - rows = _load("models") - for r in rows: - if r.get("id") == model_id: - if name is not None: - r["name"] = name - if api_key is not None: - r["api_key"] = api_key - if base_url is not None: - r["base_url"] = base_url - if model_name is not None: - r["model_name"] = model_name - _save("models", rows) - return r - return None - + with _LOCK: + conn = _conn() + try: + updates, params = [], [] + for k, v in (("name", name), ("api_key", api_key), ("base_url", base_url), ("model_name", model_name)): + if v is not None: + updates.append(f"{k} = ?") + params.append(v) + if not updates: + cur = conn.execute("SELECT * FROM models WHERE id = ?", (model_id,)) + row = cur.fetchone() + return _row_to_dict(row) if row else None + params.append(model_id) + conn.execute(f"UPDATE models SET {', '.join(updates)} WHERE id = ?", params) + conn.commit() + cur = conn.execute("SELECT * FROM models WHERE id = ?", (model_id,)) + row = cur.fetchone() + return _row_to_dict(row) if row else None + finally: + conn.close() def set_current_model(model_id: str) -> Optional[Dict]: - rows = _load("models") - found = None - for r in rows: - if r.get("id") == model_id: - r["is_current"] = True - found = r - else: - r["is_current"] = False - if found: - _save("models", rows) - return found - + with _LOCK: + conn = _conn() + try: + conn.execute("UPDATE models SET is_current = 0") + conn.execute("UPDATE models SET is_current = 1 WHERE id = ?", (model_id,)) + conn.commit() + cur = conn.execute("SELECT * FROM models WHERE id = ?", (model_id,)) + row = cur.fetchone() + return _row_to_dict(row) if row else None + finally: + conn.close() def delete_model(model_id: str) -> bool: - rows = _load("models") - for i, r in enumerate(rows): - if r.get("id") == model_id: - was_current = r.get("is_current") - rows.pop(i) - if was_current and rows: - rows[0]["is_current"] = True - _save("models", rows) + with _LOCK: + conn = _conn() + try: + cur = conn.execute("SELECT is_current FROM models WHERE id = ?", (model_id,)) + row = cur.fetchone() + if not row: + return False + was_current = row["is_current"] + conn.execute("DELETE FROM models WHERE id = ?", (model_id,)) + if was_current: + cur = conn.execute("SELECT id FROM models LIMIT 1") + first = cur.fetchone() + if first: + conn.execute("UPDATE models SET is_current = 1 WHERE id = ?", (first["id"],)) + conn.commit() return True - return False + finally: + conn.close() + + +# ---------- AI 回复配置 ---------- +def get_ai_reply_config(key: str) -> Optional[Dict]: + with _LOCK: + conn = _conn() + try: + cur = conn.execute("SELECT * FROM ai_reply_config WHERE key = ?", (key,)) + row = cur.fetchone() + return _row_to_dict(row) if row else None + finally: + conn.close() + +def update_ai_reply_config( + key: str, + super_admin_wxids: Optional[List[str]] = None, + whitelist_wxids: Optional[List[str]] = None, +) -> Dict: + with _LOCK: + conn = _conn() + try: + cur = conn.execute("SELECT * FROM ai_reply_config WHERE key = ?", (key,)) + row = cur.fetchone() + if row: + updates, params = [], [] + if super_admin_wxids is not None: + updates.append("super_admin_wxids = ?") + params.append(json.dumps([str(x).strip() for x in super_admin_wxids if str(x).strip()], ensure_ascii=False)) + if whitelist_wxids is not None: + updates.append("whitelist_wxids = ?") + params.append(json.dumps([str(x).strip() for x in whitelist_wxids if str(x).strip()], ensure_ascii=False)) + if updates: + params.append(key) + conn.execute(f"UPDATE ai_reply_config SET {', '.join(updates)} WHERE key = ?", params) + conn.commit() + cur = conn.execute("SELECT * FROM ai_reply_config WHERE key = ?", (key,)) + return _row_to_dict(cur.fetchone()) + super_admin_wxids = [str(x).strip() for x in (super_admin_wxids or []) if str(x).strip()] + whitelist_wxids = [str(x).strip() for x in (whitelist_wxids or []) if str(x).strip()] + conn.execute( + "INSERT INTO ai_reply_config (key, super_admin_wxids, whitelist_wxids) VALUES (?,?,?)", + (key, json.dumps(super_admin_wxids, ensure_ascii=False), json.dumps(whitelist_wxids, ensure_ascii=False)) + ) + conn.commit() + cur = conn.execute("SELECT * FROM ai_reply_config WHERE key = ?", (key,)) + return _row_to_dict(cur.fetchone()) + finally: + conn.close() diff --git a/public/chat.html b/public/chat.html index f0ede80..1b5b40e 100644 --- a/public/chat.html +++ b/public/chat.html @@ -27,11 +27,14 @@ .nav { display: flex; align-items: center; - gap: 16px; + justify-content: space-between; + flex-wrap: wrap; + gap: 12px; padding: 12px 24px; border-bottom: 1px solid var(--border); background: rgba(2, 6, 23, 0.95); } + .nav-links { display: flex; align-items: center; gap: 16px; flex-wrap: wrap; } .nav a { color: var(--muted); text-decoration: none; @@ -39,6 +42,8 @@ } .nav a:hover { color: var(--accent); } .nav a.current { color: var(--accent); font-weight: 500; } + .nav-banner { font-size: 18px; font-weight: 700; letter-spacing: 0.04em; color: var(--text); text-shadow: 0 0 20px rgba(34, 197, 94, 0.2); } + .nav-banner span { color: var(--accent); } .container { max-width: 720px; margin: 0 auto; @@ -64,12 +69,17 @@ padding: 8px; margin-bottom: 16px; } - .msg-item { font-size: 12px; padding: 8px 10px; border-bottom: 1px solid var(--border); } + .msg-item { font-size: 12px; padding: 8px 10px; border-bottom: 1px solid var(--border); display:flex; flex-direction:column; gap:4px; } .msg-item:last-child { border-bottom: none; } .msg-item .from { color: var(--accent); margin-right: 8px; } .msg-item.out { opacity: 0.9; } .msg-item.out .from { color: #94a3b8; } .msg-item .time { font-size: 11px; color: var(--muted); margin-left: 8px; } + .msg-item .meta { display:flex; align-items:center; flex-wrap:wrap; } + .msg-item .content { margin-left: 0; font-size: 12px; word-break: break-all; } + .msg-item .content img { max-width: 100%; border-radius: 6px; margin-top: 4px; } + .msg-item .content audio, + .msg-item .content video { max-width: 100%; margin-top: 4px; } .form-row { display: flex; gap: 12px; align-items: flex-end; flex-wrap: wrap; margin-bottom: 10px; } .form-row label { display: block; font-size: 12px; color: var(--muted); margin-bottom: 4px; } .form-row input { padding: 8px 12px; border: 1px solid var(--border); border-radius: 8px; background: rgba(15,23,42,0.9); color: var(--text); font-size: 13px; } @@ -81,10 +91,13 @@
@@ -117,42 +130,119 @@ const API_BASE = 'http://localhost:8000'; const KEY_STORAGE = 'wechat_key'; - function getKey() { - const k = $('key').value.trim(); - if (!k) { alert('请先填写账号 key'); return null; } - return k; + function getToken() { + try { + return localStorage.getItem('auth_token') || ''; + } catch (_) { + return ''; + } + } + + function redirectToLogin(msg) { + if (msg) alert(msg); + window.location.href = 'index.html'; } async function callApi(path, options = {}) { const url = API_BASE + path; - const res = await fetch(url, { ...options, headers: { 'Content-Type': 'application/json', ...(options.headers || {}) } }); + const token = getToken(); + const headers = { 'Content-Type': 'application/json', ...(options.headers || {}) }; + if (token) headers['Authorization'] = 'Bearer ' + token; + + const res = await fetch(url, { ...options, headers }); let body = null; try { body = await res.json(); } catch (_) {} + if (res.status === 401) { + redirectToLogin('登录已失效,请重新扫码登录'); + throw new Error('unauthorized'); + } if (!res.ok) throw new Error((body && (body.detail || body.message)) || res.statusText || '请求失败'); return body; } + function renderMessageContent(m) { + const msgType = m.MsgType ?? m.msgType; + const rawContent = m.Content || m.content || ''; + const imageContent = m.ImageContent || m.imageContent || ''; + const from = m.FromUserName || m.from || m.MsgId || '-'; + + // 链接检测 + const isUrl = (s) => typeof s === 'string' && /^https?:\/\//i.test(s.trim()); + const isBase64Like = (s) => typeof s === 'string' && /^[A-Za-z0-9+/=\s]+$/.test(s) && s.replace(/\s+/g, '').length > 60; + + // 图片:上游通常提供 ImageContent(base64)或 Content 为图片链接 + if (imageContent || (msgType === 3) || (msgType === 0 && imageContent)) { + const src = isUrl(imageContent) ? imageContent : + (imageContent ? ('data:image/png;base64,' + imageContent.replace(/\s+/g, '')) : + (isUrl(rawContent) ? rawContent : '')); + if (src) { + return `
${rawContent ? String(rawContent) : ''}
图片消息
`; + } + } + + // 视频 / 音频:简单通过扩展名或 MsgType 约定判断 + if (msgType === 43 || msgType === 'video') { + const src = isUrl(rawContent) ? rawContent : ''; + if (src) return `
`; + } + if (msgType === 34 || msgType === 'audio') { + const src = isUrl(rawContent) ? rawContent : ''; + if (src) return `
`; + } + + // 若内容是 URL,则渲染为可点击链接 + if (isUrl(rawContent)) { + const safe = String(rawContent); + return ``; + } + + // 若内容看起来是图片 base64,则按图片渲染 + if (isBase64Like(rawContent) && (msgType === 3 || msgType === 0)) { + const src = 'data:image/png;base64,' + String(rawContent).replace(/\s+/g, ''); + return `
图片消息
`; + } + + // 兜底为纯文本(含 MsgType 提示) + const text = rawContent ? String(rawContent) : (msgType != null ? `MsgType=${msgType}` : ''); + return `
${text}
`; + } + async function loadMessages() { - const key = $('key').value.trim(); - if (!key) { $('message-list').innerHTML = '

请先填写账号 key。

'; return; } + const key = (function() { + try { + return localStorage.getItem(KEY_STORAGE) || ''; + } catch (_) { + return ''; + } + })(); + if (!key) { + $('message-list').innerHTML = '

请先在登录页扫码登录。

'; + return; + } try { + // 后端已按时间倒序(最新在前)返回,这里保持顺序即可 const data = await callApi('/api/messages?key=' + encodeURIComponent(key) + '&limit=80'); const list = data.items || []; - // 按时间正序排列,最新在底部,便于看完整对话 - const sorted = [...list].sort((a, b) => (a.CreateTime || 0) - (b.CreateTime || 0)); - $('message-list').innerHTML = sorted.length ? sorted.map(m => { + $('message-list').innerHTML = list.length ? list.map(m => { const isOut = m.direction === 'out'; - const from = isOut ? ('我 → ' + (m.ToUserName || '')) : (m.FromUserName || m.from || m.MsgId || '-').toString().slice(0, 32); - const content = (m.Content || m.content || m.MsgType || '').toString().slice(0, 200); - const time = m.CreateTime ? (typeof m.CreateTime === 'number' ? new Date(m.CreateTime * 1000).toLocaleTimeString('zh-CN', { hour12: false }) : m.CreateTime) : ''; - return '
' + from + '' + content + (time ? ' ' + time + '' : '') + '
'; - }).join('') : '

暂无对话。请确保已登录且后端 WS 已连接 GetSyncMsg;发送的消息也会在此展示。

'; - } catch (e) { $('message-list').innerHTML = '

加载失败: ' + e.message + '

'; } + const fromLabel = isOut ? ('我 → ' + (m.ToUserName || '')) : (m.FromUserName || m.from || m.MsgId || '-').toString().slice(0, 32); + const time = m.CreateTime ? (typeof m.CreateTime === 'number' + ? new Date(m.CreateTime * 1000).toLocaleTimeString('zh-CN', { hour12: false }) + : m.CreateTime) : ''; + const meta = '
' + fromLabel + '' + (time ? '' + time + '' : '') + '
'; + const body = renderMessageContent(m); + return '
' + meta + body + '
'; + }).join('') : '

暂无对话。请确保已登录且后端 WS 已连接 GetSyncMsg;发送的消息(含图片、音视频等)也会在此展示。

'; + } catch (e) { + $('message-list').innerHTML = '

加载失败: ' + e.message + '

'; + } } async function sendMessage() { - const key = getKey(); - if (!key) return; + const key = (function() { + try { return localStorage.getItem(KEY_STORAGE) || ''; } catch (_) { return ''; } + })(); + if (!key) { alert('请先在登录页扫码登录'); return; } const to = $('send-to').value.trim(); const content = $('send-content').value.trim(); if (!to || !content) { alert('请填写对方用户名和内容'); return; } @@ -163,10 +253,14 @@ } catch (e) { alert('发送失败: ' + e.message); } } + // 隐藏 key 行,仅内部使用 localStorage 中的 key + (function hideKeyRow() { + const row = document.querySelector('.key-row'); + if (row) row.style.display = 'none'; + })(); + $('btn-refresh-msg').addEventListener('click', loadMessages); $('btn-send-msg').addEventListener('click', sendMessage); - $('key').addEventListener('change', function() { try { localStorage.setItem(KEY_STORAGE, this.value.trim()); } catch (_) {} }); - if (typeof localStorage !== 'undefined' && localStorage.getItem(KEY_STORAGE)) $('key').value = localStorage.getItem(KEY_STORAGE); loadMessages(); (function wsStatusCheck() { diff --git a/public/index.html b/public/index.html index 91ed3e4..62711ac 100644 --- a/public/index.html +++ b/public/index.html @@ -510,6 +510,7 @@

Wechat 智能托管服务

+

API 文档 (Swagger)

diff --git a/public/manage.html b/public/manage.html index 41e388e..22f402c 100644 --- a/public/manage.html +++ b/public/manage.html @@ -27,11 +27,14 @@ .nav { display: flex; align-items: center; - gap: 16px; + justify-content: space-between; + flex-wrap: wrap; + gap: 12px; padding: 12px 24px; border-bottom: 1px solid var(--border); background: rgba(2, 6, 23, 0.95); } + .nav-links { display: flex; align-items: center; gap: 16px; flex-wrap: wrap; } .nav a { color: var(--muted); text-decoration: none; @@ -39,6 +42,14 @@ } .nav a:hover { color: var(--accent); } .nav a.current { color: var(--accent); font-weight: 500; } + .nav-banner { + font-size: 18px; + font-weight: 700; + letter-spacing: 0.04em; + color: var(--text); + text-shadow: 0 0 20px rgba(34, 197, 94, 0.2); + } + .nav-banner span { color: var(--accent); } .container { max-width: 960px; margin: 0 auto; @@ -77,8 +88,6 @@ .tag { display: inline-block; padding: 4px 10px; border-radius: 999px; background: rgba(30,41,59,0.9); border: 1px solid var(--border); font-size: 12px; margin-right: 6px; margin-bottom: 6px; } .primary { padding: 10px 20px; border-radius: 8px; background: var(--accent); color: #000; border: none; cursor: pointer; font-weight: 500; } .secondary { padding: 8px 16px; border-radius: 8px; background: rgba(30,41,59,0.9); border: 1px solid var(--border); color: var(--text); cursor: pointer; font-size: 13px; } - .key-row { display: flex; align-items: center; gap: 12px; margin-bottom: 16px; } - .key-row input { max-width: 280px; padding: 8px 12px; border: 1px solid var(--border); border-radius: 8px; background: rgba(15,23,42,0.9); color: var(--text); } .error-msg { color: #f87171; font-size: 12px; } .tags-chips { display: inline-flex; flex-wrap: wrap; gap: 6px; } .tags-chips .chip { display: inline-flex; align-items: center; gap: 4px; padding: 2px 8px; border-radius: 999px; background: var(--accent-soft); border: 1px solid var(--accent); font-size: 12px; color: var(--accent); } @@ -88,26 +97,35 @@
-
客户与消息管理(R1-2 / R1-3 / R1-4)
-
- - -
+
客户与消息管理
+
+
-
+
+ + +
从联系人选择(多选可批量填入)
+
+ + +
+
@@ -127,7 +145,7 @@
- +
@@ -140,6 +158,27 @@
+
快速群发(选好友/客户后一键分发)
+
+
+ +
+ + 已选 0 人 +
+
+
+
+ +
+
发送图片消息(快捷方式)
+
+
+
+
+ +
+
商品标签
@@ -157,6 +196,21 @@
+
+

分级处理:仅超级管理员白名单中的联系人会收到 AI 自动回复,其他消息一律不回复。

+
+
+ + +
+
+ + +
+ +
+

保存后,仅上述列表中的发信人会被 AI 接管回复;未在列表中的联系人发来的消息将不会触发自动回复。

+
+ + + diff --git a/run-docker.sh b/run-docker.sh index 3acb5db..54c5df9 100755 --- a/run-docker.sh +++ b/run-docker.sh @@ -5,6 +5,8 @@ set -e IMAGE_NAME="wechat-admin-backend" CONTAINER_NAME="wechat-admin-backend" PORT="${PORT:-3000}" +# 数据目录挂载到宿主机,防止容器删除后丢失(SQLite 库 wechat.db 及表数据) +HOST_DATA_DIR="${HOST_DATA_DIR:-$(pwd)/data}" echo "Building Docker image: ${IMAGE_NAME}..." docker build -t "${IMAGE_NAME}" . @@ -14,6 +16,9 @@ if [ "$(docker ps -aq -f name=${CONTAINER_NAME})" ]; then docker rm -f "${CONTAINER_NAME}" >/dev/null 2>&1 || true fi +mkdir -p "${HOST_DATA_DIR}" +echo "Data dir (host): ${HOST_DATA_DIR} -> container /app/backend/data" + ENV_FILE=".env" if [ ! -f "${ENV_FILE}" ]; then echo "Env file ${ENV_FILE} not found, copying from .env.example ..." @@ -26,7 +31,9 @@ docker run -d \ --env-file "${ENV_FILE}" \ -p "${PORT}:3000" \ -p "8000:8000" \ + -v "${HOST_DATA_DIR}:/app/backend/data" \ "${IMAGE_NAME}" -echo "Container started. Health check: curl http://localhost:${PORT}/health" +echo "Container started. Data persisted on host: ${HOST_DATA_DIR}" +echo "Health check: curl http://localhost:${PORT}/health"