# -*- coding: utf-8 -*- """ 将 AI 提取的结构化数据合并到 SQLite 与 panel schema 及 situationData.getSituation 对齐,支持回放 """ import os import sqlite3 from datetime import datetime from pathlib import Path from typing import Any, Dict, Optional PROJECT_ROOT = Path(__file__).resolve().parent.parent DB_PATH = os.environ.get("DB_PATH", str(PROJECT_ROOT / "server" / "data.db")) def _ensure_tables(conn: sqlite3.Connection) -> None: """确保所需表存在(与 db.js 一致)""" conn.execute(""" CREATE TABLE IF NOT EXISTS situation_update ( id TEXT PRIMARY KEY, timestamp TEXT NOT NULL, category TEXT NOT NULL, summary TEXT NOT NULL, severity TEXT NOT NULL ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS combat_losses ( side TEXT PRIMARY KEY CHECK (side IN ('us', 'iran')), bases_destroyed INTEGER NOT NULL, bases_damaged INTEGER NOT NULL, personnel_killed INTEGER NOT NULL, personnel_wounded INTEGER NOT NULL, aircraft INTEGER NOT NULL, warships INTEGER NOT NULL, armor INTEGER NOT NULL, vehicles INTEGER NOT NULL ) """) try: conn.execute("ALTER TABLE combat_losses ADD COLUMN civilian_killed INTEGER NOT NULL DEFAULT 0") except sqlite3.OperationalError: pass try: conn.execute("ALTER TABLE combat_losses ADD COLUMN civilian_wounded INTEGER NOT NULL DEFAULT 0") except sqlite3.OperationalError: pass try: conn.execute("ALTER TABLE combat_losses ADD COLUMN updated_at TEXT DEFAULT (datetime('now'))") except sqlite3.OperationalError: pass conn.execute("CREATE TABLE IF NOT EXISTS wall_street_trend (id INTEGER PRIMARY KEY AUTOINCREMENT, time TEXT NOT NULL, value INTEGER NOT NULL)") conn.execute("CREATE TABLE IF NOT EXISTS retaliation_current (id INTEGER PRIMARY KEY CHECK (id = 1), value INTEGER NOT NULL)") conn.execute("CREATE TABLE IF NOT EXISTS retaliation_history (id INTEGER PRIMARY KEY AUTOINCREMENT, time TEXT NOT NULL, value INTEGER NOT NULL)") conn.execute("CREATE TABLE IF NOT EXISTS situation (id INTEGER PRIMARY KEY CHECK (id = 1), data TEXT NOT NULL, updated_at TEXT NOT NULL)") conn.commit() def merge(extracted: Dict[str, Any], db_path: Optional[str] = None) -> bool: """将提取数据合并到 DB,返回是否有更新""" path = db_path or DB_PATH if not os.path.exists(path): return False conn = sqlite3.connect(path, timeout=10) try: _ensure_tables(conn) updated = False # situation_update if "situation_update" in extracted: u = extracted["situation_update"] uid = f"ai_{hash(u.get('summary','')+u.get('timestamp','')) % 10**10}" conn.execute( "INSERT OR IGNORE INTO situation_update (id, timestamp, category, summary, severity) VALUES (?, ?, ?, ?, ?)", (uid, u.get("timestamp", ""), u.get("category", "other"), u.get("summary", "")[:500], u.get("severity", "medium")), ) if conn.total_changes > 0: updated = True # combat_losses:增量叠加到当前值,无行则先插入初始行 if "combat_losses_delta" in extracted: for side, delta in extracted["combat_losses_delta"].items(): if side not in ("us", "iran"): continue try: row = conn.execute( "SELECT personnel_killed,personnel_wounded,civilian_killed,civilian_wounded,bases_destroyed,bases_damaged,aircraft,warships,armor,vehicles FROM combat_losses WHERE side = ?", (side,), ).fetchone() cur = {"personnel_killed": 0, "personnel_wounded": 0, "civilian_killed": 0, "civilian_wounded": 0, "bases_destroyed": 0, "bases_damaged": 0, "aircraft": 0, "warships": 0, "armor": 0, "vehicles": 0} if row: cur = { "personnel_killed": row[0], "personnel_wounded": row[1], "civilian_killed": row[2] or 0, "civilian_wounded": row[3] or 0, "bases_destroyed": row[4], "bases_damaged": row[5], "aircraft": row[6], "warships": row[7], "armor": row[8], "vehicles": row[9], } pk = max(0, (cur["personnel_killed"] or 0) + delta.get("personnel_killed", 0)) pw = max(0, (cur["personnel_wounded"] or 0) + delta.get("personnel_wounded", 0)) ck = max(0, (cur["civilian_killed"] or 0) + delta.get("civilian_killed", 0)) cw = max(0, (cur["civilian_wounded"] or 0) + delta.get("civilian_wounded", 0)) bd = max(0, (cur["bases_destroyed"] or 0) + delta.get("bases_destroyed", 0)) bm = max(0, (cur["bases_damaged"] or 0) + delta.get("bases_damaged", 0)) ac = max(0, (cur["aircraft"] or 0) + delta.get("aircraft", 0)) ws = max(0, (cur["warships"] or 0) + delta.get("warships", 0)) ar = max(0, (cur["armor"] or 0) + delta.get("armor", 0)) vh = max(0, (cur["vehicles"] or 0) + delta.get("vehicles", 0)) ts = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.000Z") if row: conn.execute( """UPDATE combat_losses SET personnel_killed=?, personnel_wounded=?, civilian_killed=?, civilian_wounded=?, bases_destroyed=?, bases_damaged=?, aircraft=?, warships=?, armor=?, vehicles=?, updated_at=? WHERE side=?""", (pk, pw, ck, cw, bd, bm, ac, ws, ar, vh, ts, side), ) else: conn.execute( """INSERT OR REPLACE INTO combat_losses (side, personnel_killed, personnel_wounded, civilian_killed, civilian_wounded, bases_destroyed, bases_damaged, aircraft, warships, armor, vehicles, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", (side, pk, pw, ck, cw, bd, bm, ac, ws, ar, vh, ts), ) if conn.total_changes > 0: updated = True except Exception: pass # retaliation if "retaliation" in extracted: r = extracted["retaliation"] conn.execute("INSERT OR REPLACE INTO retaliation_current (id, value) VALUES (1, ?)", (r["value"],)) conn.execute("INSERT INTO retaliation_history (time, value) VALUES (?, ?)", (r["time"], r["value"])) updated = True # wall_street_trend if "wall_street" in extracted: w = extracted["wall_street"] conn.execute("INSERT INTO wall_street_trend (time, value) VALUES (?, ?)", (w["time"], w["value"])) updated = True # key_location:更新受袭基地 status/damage_level if "key_location_updates" in extracted: try: for u in extracted["key_location_updates"]: kw = (u.get("name_keywords") or "").replace("|", " ").split() side = u.get("side") status = u.get("status", "attacked")[:20] dmg = u.get("damage_level", 2) if not kw or side not in ("us", "iran"): continue conditions = " OR ".join( "(LOWER(name) LIKE ? OR name LIKE ?)" for _ in kw ) params = [status, dmg, side] for k in kw: params.extend([f"%{k}%", f"%{k}%"]) cur = conn.execute( f"UPDATE key_location SET status=?, damage_level=? WHERE side=? AND ({conditions})", params, ) if cur.rowcount > 0: updated = True except Exception: pass if updated: conn.execute("INSERT OR REPLACE INTO situation (id, data, updated_at) VALUES (1, '{}', ?)", (datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.000Z"),)) conn.commit() return updated except Exception as e: conn.rollback() raise e finally: conn.close()