# -*- coding: utf-8 -*- """数据库存储:客户档案、定时问候、商品标签、推送群组/任务、同步消息、回调原始日志、模型、AI 回复配置。使用 SQLite,便于增删改查。""" import datetime import json import threading import time import uuid from typing import Any, Dict, List, Optional try: from backend import db except ImportError: import db _LOCK = threading.Lock() def _conn(): conn = db.get_conn() db.init_schema(conn) return conn def _row_to_dict(row) -> dict: if row is None: return {} d = dict(row) out = {} for k, v in d.items(): if k in ("tags", "customer_tags", "customer_ids", "tag_ids", "super_admin_wxids", "whitelist_wxids") and isinstance(v, str): try: out[k] = json.loads(v) if v else [] except Exception: out[k] = [] elif k in ("use_qwen", "enabled", "is_current") and v is not None: out[k] = bool(v) else: out[k] = v return out # ---------- 客户档案 ---------- def list_customers(key: Optional[str] = None) -> List[Dict]: with _LOCK: conn = _conn() try: if key: cur = conn.execute("SELECT * FROM customers WHERE key = ? ORDER BY remark_name, wxid", (key,)) else: cur = conn.execute("SELECT * FROM customers ORDER BY remark_name, wxid") return [_row_to_dict(r) for r in cur.fetchall()] finally: conn.close() def get_customer(customer_id: str) -> Optional[Dict]: with _LOCK: conn = _conn() try: cur = conn.execute("SELECT * FROM customers WHERE id = ?", (customer_id,)) row = cur.fetchone() return _row_to_dict(row) if row else None finally: conn.close() def upsert_customer(key: str, wxid: str, remark_name: str = "", region: str = "", age: str = "", gender: str = "", level: str = "", tags: Optional[List[str]] = None, customer_id: Optional[str] = None) -> Dict: tags = tags or [] rid = customer_id or str(uuid.uuid4()) tags_json = json.dumps(tags, ensure_ascii=False) with _LOCK: conn = _conn() try: if customer_id: conn.execute( "UPDATE customers SET key=?, wxid=?, remark_name=?, region=?, age=?, gender=?, level=?, tags=? WHERE id=?", (key, wxid, remark_name, region, age, gender, level, tags_json, customer_id) ) conn.commit() cur = conn.execute("SELECT * FROM customers WHERE id = ?", (customer_id,)) return _row_to_dict(cur.fetchone()) cur = conn.execute("SELECT id FROM customers WHERE key = ? AND wxid = ?", (key, wxid)) row = cur.fetchone() if row: conn.execute( "UPDATE customers SET remark_name=?, region=?, age=?, gender=?, level=?, tags=? WHERE id=?", (remark_name, region, age, gender, level, tags_json, row["id"]) ) conn.commit() cur = conn.execute("SELECT * FROM customers WHERE id = ?", (row["id"],)) return _row_to_dict(cur.fetchone()) conn.execute( "INSERT INTO customers (id, key, wxid, remark_name, region, age, gender, level, tags) VALUES (?,?,?,?,?,?,?,?,?)", (rid, key, wxid, remark_name, region, age, gender, level, tags_json) ) conn.commit() cur = conn.execute("SELECT * FROM customers WHERE id = ?", (rid,)) return _row_to_dict(cur.fetchone()) finally: conn.close() def delete_customer(customer_id: str) -> bool: with _LOCK: conn = _conn() try: cur = conn.execute("DELETE FROM customers WHERE id = ?", (customer_id,)) conn.commit() return cur.rowcount > 0 finally: conn.close() def list_customer_tags(key: str) -> List[str]: rows = [r for r in list_customers(key=key) if r] tags_set = set() for r in rows: for t in (r.get("tags") or []): if t and str(t).strip(): tags_set.add(str(t).strip()) return sorted(tags_set) # ---------- 定时问候任务 ---------- def list_greeting_tasks(key: Optional[str] = None) -> List[Dict]: with _LOCK: conn = _conn() try: if key: cur = conn.execute("SELECT * FROM greeting_tasks WHERE key = ? ORDER BY send_time", (key,)) else: cur = conn.execute("SELECT * FROM greeting_tasks ORDER BY send_time") return [_row_to_dict(r) for r in cur.fetchall()] finally: conn.close() def get_greeting_task(task_id: str) -> Optional[Dict]: with _LOCK: conn = _conn() try: cur = conn.execute("SELECT * FROM greeting_tasks WHERE id = ?", (task_id,)) row = cur.fetchone() return _row_to_dict(row) if row else None finally: conn.close() def create_greeting_task(key: str, name: str, send_time: str, customer_tags: List[str], template: str, use_qwen: bool = False) -> Dict: rid = str(uuid.uuid4()) row = { "id": rid, "key": key, "name": name, "send_time": send_time, "customer_tags": customer_tags or [], "template": template, "use_qwen": use_qwen, "enabled": True, "executed_at": None, } with _LOCK: conn = _conn() try: conn.execute( "INSERT INTO greeting_tasks (id, key, name, send_time, customer_tags, template, use_qwen, enabled, executed_at) VALUES (?,?,?,?,?,?,?,?,?)", (rid, key, name, send_time, json.dumps(customer_tags or [], ensure_ascii=False), template, 1 if use_qwen else 0, 1, None) ) conn.commit() return row finally: conn.close() def update_greeting_task(task_id: str, **kwargs) -> Optional[Dict]: allowed = ("name", "send_time", "cron", "customer_tags", "template", "use_qwen", "enabled", "executed_at") with _LOCK: conn = _conn() try: cur = conn.execute("SELECT * FROM greeting_tasks WHERE id = ?", (task_id,)) row = cur.fetchone() if not row: return None updates = [] params = [] if "send_time" in kwargs: updates.append("send_time = ?") params.append(kwargs["send_time"]) if "cron" in kwargs: updates.append("send_time = ?") params.append(kwargs["cron"]) for k in ("name", "customer_tags", "template", "use_qwen", "enabled", "executed_at"): if k in kwargs: v = kwargs[k] if k == "customer_tags": updates.append("customer_tags = ?") params.append(json.dumps(v, ensure_ascii=False)) elif k == "use_qwen": updates.append("use_qwen = ?") params.append(1 if v else 0) elif k == "enabled": updates.append("enabled = ?") params.append(1 if v else 0) else: updates.append(f"{k} = ?") params.append(v) if not updates: return _row_to_dict(row) params.append(task_id) conn.execute(f"UPDATE greeting_tasks SET {', '.join(updates)} WHERE id = ?", params) conn.commit() cur = conn.execute("SELECT * FROM greeting_tasks WHERE id = ?", (task_id,)) return _row_to_dict(cur.fetchone()) finally: conn.close() def delete_greeting_task(task_id: str) -> bool: with _LOCK: conn = _conn() try: cur = conn.execute("DELETE FROM greeting_tasks WHERE id = ?", (task_id,)) conn.commit() return cur.rowcount > 0 finally: conn.close() # ---------- 商品标签 ---------- def list_product_tags(key: Optional[str] = None) -> List[Dict]: with _LOCK: conn = _conn() try: if key: cur = conn.execute("SELECT * FROM product_tags WHERE key = ?", (key,)) else: cur = conn.execute("SELECT * FROM product_tags") return [_row_to_dict(r) for r in cur.fetchall()] finally: conn.close() def create_product_tag(key: str, name: str) -> Dict: rid = str(uuid.uuid4()) with _LOCK: conn = _conn() try: conn.execute("INSERT INTO product_tags (id, key, name) VALUES (?,?,?)", (rid, key, name)) conn.commit() cur = conn.execute("SELECT * FROM product_tags WHERE id = ?", (rid,)) return _row_to_dict(cur.fetchone()) finally: conn.close() def delete_product_tag(tag_id: str) -> bool: with _LOCK: conn = _conn() try: cur = conn.execute("DELETE FROM product_tags WHERE id = ?", (tag_id,)) conn.commit() return cur.rowcount > 0 finally: conn.close() # ---------- 推送群组 ---------- def list_push_groups(key: Optional[str] = None) -> List[Dict]: with _LOCK: conn = _conn() try: if key: cur = conn.execute("SELECT * FROM push_groups WHERE key = ?", (key,)) else: cur = conn.execute("SELECT * FROM push_groups") return [_row_to_dict(r) for r in cur.fetchall()] finally: conn.close() def create_push_group(key: str, name: str, customer_ids: List[str], tag_ids: List[str]) -> Dict: rid = str(uuid.uuid4()) with _LOCK: conn = _conn() try: conn.execute( "INSERT INTO push_groups (id, key, name, customer_ids, tag_ids) VALUES (?,?,?,?,?)", (rid, key, name, json.dumps(customer_ids or [], ensure_ascii=False), json.dumps(tag_ids or [], ensure_ascii=False)) ) conn.commit() cur = conn.execute("SELECT * FROM push_groups WHERE id = ?", (rid,)) return _row_to_dict(cur.fetchone()) finally: conn.close() def update_push_group(group_id: str, name: Optional[str] = None, customer_ids: Optional[List[str]] = None, tag_ids: Optional[List[str]] = None) -> Optional[Dict]: with _LOCK: conn = _conn() try: cur = conn.execute("SELECT * FROM push_groups WHERE id = ?", (group_id,)) if cur.fetchone() is None: return None updates, params = [], [] if name is not None: updates.append("name = ?") params.append(name) if customer_ids is not None: updates.append("customer_ids = ?") params.append(json.dumps(customer_ids, ensure_ascii=False)) if tag_ids is not None: updates.append("tag_ids = ?") params.append(json.dumps(tag_ids, ensure_ascii=False)) if not updates: cur = conn.execute("SELECT * FROM push_groups WHERE id = ?", (group_id,)) return _row_to_dict(cur.fetchone()) params.append(group_id) conn.execute(f"UPDATE push_groups SET {', '.join(updates)} WHERE id = ?", params) conn.commit() cur = conn.execute("SELECT * FROM push_groups WHERE id = ?", (group_id,)) return _row_to_dict(cur.fetchone()) finally: conn.close() def delete_push_group(group_id: str) -> bool: with _LOCK: conn = _conn() try: cur = conn.execute("DELETE FROM push_groups WHERE id = ?", (group_id,)) conn.commit() return cur.rowcount > 0 finally: conn.close() # ---------- 推送任务 ---------- def list_push_tasks(key: Optional[str] = None, limit: int = 200) -> List[Dict]: with _LOCK: conn = _conn() try: if key: cur = conn.execute("SELECT * FROM push_tasks WHERE key = ? ORDER BY created_at DESC LIMIT ?", (key, limit)) else: cur = conn.execute("SELECT * FROM push_tasks ORDER BY created_at DESC LIMIT ?", (limit,)) return [_row_to_dict(r) for r in cur.fetchall()] finally: conn.close() def create_push_task(key: str, product_tag_id: str, group_id: str, content: str, send_at: Optional[str] = None) -> Dict: rid = str(uuid.uuid4()) created = time.strftime("%Y-%m-%dT%H:%M:%S") with _LOCK: conn = _conn() try: conn.execute( "INSERT INTO push_tasks (id, key, product_tag_id, group_id, content, send_at, status, created_at) VALUES (?,?,?,?,?,?,?,?)", (rid, key, product_tag_id, group_id, content, send_at, "pending", created) ) conn.commit() cur = conn.execute("SELECT * FROM push_tasks WHERE id = ?", (rid,)) return _row_to_dict(cur.fetchone()) finally: conn.close() def update_push_task_status(task_id: str, status: str) -> Optional[Dict]: with _LOCK: conn = _conn() try: conn.execute("UPDATE push_tasks SET status = ? WHERE id = ?", (status, task_id)) conn.commit() cur = conn.execute("SELECT * FROM push_tasks WHERE id = ?", (task_id,)) row = cur.fetchone() return _row_to_dict(row) if row else None finally: conn.close() # ---------- 同步消息 ---------- def append_sync_messages(key: str, messages: List[Dict], max_per_key: int = 500) -> None: with _LOCK: conn = _conn() try: for m in messages: create_time = int(m.get("CreateTime") or 0) if isinstance(m.get("CreateTime"), (int, float)) else 0 conn.execute("INSERT INTO sync_messages (key, create_time, payload) VALUES (?,?,?)", (key, create_time, json.dumps(m, ensure_ascii=False))) conn.commit() # 每个 key 只保留最近 max_per_key 条 cur = conn.execute("SELECT id FROM sync_messages WHERE key = ? ORDER BY create_time DESC", (key,)) rows = cur.fetchall() if len(rows) > max_per_key: to_del = [r["id"] for r in rows[max_per_key:]] placeholders = ",".join("?" * len(to_del)) conn.execute(f"DELETE FROM sync_messages WHERE id IN ({placeholders})", to_del) conn.commit() finally: conn.close() def list_sync_messages(key: str, limit: int = 100) -> List[Dict]: with _LOCK: conn = _conn() try: cur = conn.execute( "SELECT payload FROM sync_messages WHERE key = ? ORDER BY create_time DESC LIMIT ?", (key, limit) ) rows = cur.fetchall() out = [] for r in rows: try: out.append(json.loads(r["payload"])) except Exception: pass return out finally: conn.close() def append_sent_message(key: str, to_user_name: str, content: str) -> None: append_sync_messages(key, [{"direction": "out", "ToUserName": to_user_name, "Content": content, "CreateTime": int(time.time())}]) def append_callback_log(key: str, raw_body: dict, max_raw_len: int = 51200) -> None: """将 7006 回调的原始 body 落库,便于回溯与统计。raw_body 序列化后截断,避免单条过大。""" received_at = datetime.datetime.utcnow().isoformat() + "Z" raw_str = json.dumps(raw_body, ensure_ascii=False) if len(raw_str) > max_raw_len: raw_str = raw_str[:max_raw_len] + "...[truncated]" with _LOCK: conn = _conn() try: conn.execute( "INSERT INTO callback_log (key, received_at, raw_body) VALUES (?,?,?)", (key, received_at, raw_str), ) conn.commit() # 每个 key 仅保留最近 2000 条原始回调 cur = conn.execute("SELECT id FROM callback_log WHERE key = ? ORDER BY id DESC", (key,)) rows = cur.fetchall() if len(rows) > 2000: to_del = [r["id"] for r in rows[2000:]] placeholders = ",".join("?" * len(to_del)) conn.execute(f"DELETE FROM callback_log WHERE id IN ({placeholders})", to_del) conn.commit() finally: conn.close() # ---------- 模型 ---------- def list_models() -> List[Dict]: with _LOCK: conn = _conn() try: cur = conn.execute("SELECT * FROM models ORDER BY is_current DESC, name") return [_row_to_dict(r) for r in cur.fetchall()] finally: conn.close() def get_model(model_id: str) -> Optional[Dict]: with _LOCK: conn = _conn() try: cur = conn.execute("SELECT * FROM models WHERE id = ?", (model_id,)) row = cur.fetchone() return _row_to_dict(row) if row else None finally: conn.close() def get_current_model() -> Optional[Dict]: with _LOCK: conn = _conn() try: cur = conn.execute("SELECT * FROM models WHERE is_current = 1 LIMIT 1") row = cur.fetchone() return _row_to_dict(row) if row else None finally: conn.close() def create_model( name: str, provider: str, api_key: str, base_url: str = "", model_name: str = "", is_current: bool = False, ) -> Dict: with _LOCK: conn = _conn() try: if is_current: conn.execute("UPDATE models SET is_current = 0") rid = str(uuid.uuid4()) default_base = "https://api.openai.com/v1" default_model = "gpt-3.5-turbo" if provider == "qwen": default_base = "https://dashscope.aliyuncs.com/compatible-mode/v1" default_model = "qwen-turbo" elif provider == "doubao": default_base = "https://ark.cn-beijing.volces.com/api/v3" default_model = "doubao-seed-2-0-pro-260215" base_url = (base_url or default_base).strip() model_name = (model_name or default_model).strip() cur = conn.execute("SELECT COUNT(*) as c FROM models") is_current = is_current or cur.fetchone()["c"] == 0 if is_current: conn.execute("UPDATE models SET is_current = 0") conn.execute( "INSERT INTO models (id, name, provider, api_key, base_url, model_name, is_current) VALUES (?,?,?,?,?,?,?)", (rid, name, provider, api_key, base_url, model_name, 1 if is_current else 0) ) conn.commit() cur = conn.execute("SELECT * FROM models WHERE id = ?", (rid,)) return _row_to_dict(cur.fetchone()) finally: conn.close() def update_model( model_id: str, name: Optional[str] = None, api_key: Optional[str] = None, base_url: Optional[str] = None, model_name: Optional[str] = None, ) -> Optional[Dict]: with _LOCK: conn = _conn() try: updates, params = [], [] for k, v in (("name", name), ("api_key", api_key), ("base_url", base_url), ("model_name", model_name)): if v is not None: updates.append(f"{k} = ?") params.append(v) if not updates: cur = conn.execute("SELECT * FROM models WHERE id = ?", (model_id,)) row = cur.fetchone() return _row_to_dict(row) if row else None params.append(model_id) conn.execute(f"UPDATE models SET {', '.join(updates)} WHERE id = ?", params) conn.commit() cur = conn.execute("SELECT * FROM models WHERE id = ?", (model_id,)) row = cur.fetchone() return _row_to_dict(row) if row else None finally: conn.close() def set_current_model(model_id: str) -> Optional[Dict]: with _LOCK: conn = _conn() try: conn.execute("UPDATE models SET is_current = 0") conn.execute("UPDATE models SET is_current = 1 WHERE id = ?", (model_id,)) conn.commit() cur = conn.execute("SELECT * FROM models WHERE id = ?", (model_id,)) row = cur.fetchone() return _row_to_dict(row) if row else None finally: conn.close() def delete_model(model_id: str) -> bool: with _LOCK: conn = _conn() try: cur = conn.execute("SELECT is_current FROM models WHERE id = ?", (model_id,)) row = cur.fetchone() if not row: return False was_current = row["is_current"] conn.execute("DELETE FROM models WHERE id = ?", (model_id,)) if was_current: cur = conn.execute("SELECT id FROM models LIMIT 1") first = cur.fetchone() if first: conn.execute("UPDATE models SET is_current = 1 WHERE id = ?", (first["id"],)) conn.commit() return True finally: conn.close() # ---------- AI 回复配置 ---------- def get_ai_reply_config(key: str) -> Optional[Dict]: with _LOCK: conn = _conn() try: cur = conn.execute("SELECT * FROM ai_reply_config WHERE key = ?", (key,)) row = cur.fetchone() return _row_to_dict(row) if row else None finally: conn.close() def update_ai_reply_config( key: str, super_admin_wxids: Optional[List[str]] = None, whitelist_wxids: Optional[List[str]] = None, ) -> Dict: with _LOCK: conn = _conn() try: cur = conn.execute("SELECT * FROM ai_reply_config WHERE key = ?", (key,)) row = cur.fetchone() if row: updates, params = [], [] if super_admin_wxids is not None: updates.append("super_admin_wxids = ?") params.append(json.dumps([str(x).strip() for x in super_admin_wxids if str(x).strip()], ensure_ascii=False)) if whitelist_wxids is not None: updates.append("whitelist_wxids = ?") params.append(json.dumps([str(x).strip() for x in whitelist_wxids if str(x).strip()], ensure_ascii=False)) if updates: params.append(key) conn.execute(f"UPDATE ai_reply_config SET {', '.join(updates)} WHERE key = ?", params) conn.commit() cur = conn.execute("SELECT * FROM ai_reply_config WHERE key = ?", (key,)) return _row_to_dict(cur.fetchone()) super_admin_wxids = [str(x).strip() for x in (super_admin_wxids or []) if str(x).strip()] whitelist_wxids = [str(x).strip() for x in (whitelist_wxids or []) if str(x).strip()] conn.execute( "INSERT INTO ai_reply_config (key, super_admin_wxids, whitelist_wxids) VALUES (?,?,?)", (key, json.dumps(super_admin_wxids, ensure_ascii=False), json.dumps(whitelist_wxids, ensure_ascii=False)) ) conn.commit() cur = conn.execute("SELECT * FROM ai_reply_config WHERE key = ?", (key,)) return _row_to_dict(cur.fetchone()) finally: conn.close()