Files
usa/crawler/db_merge.py
2026-03-06 10:34:52 +08:00

320 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
将 AI 提取的结构化数据合并到 SQLite
与 panel schema 及 situationData.getSituation 对齐,支持回放。
地图打击数据(与前端攻击动画一致):
- map_strike_sources: [{ "id": "israel"|"lincoln"|"ford", "name": "显示名", "lng", "lat" }] 写入 map_strike_source
- map_strike_lines: [{ "source_id", "target_lng", "target_lat", "target_name?", "struck_at?" }] 追加到 map_strike_line
爬虫/AI 可按此格式输出,落库后 GET /api/situation 的 mapData.strikeSources/strikeLines 会更新,前端直接追加攻击动画。
"""
import os
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
PROJECT_ROOT = Path(__file__).resolve().parent.parent
DB_PATH = os.environ.get("DB_PATH", str(PROJECT_ROOT / "server" / "data.db"))
# 单次合并时各字段增量的上限,防止误把「累计总数」当增量导致数据剧增(可选,设为 0 表示不设限)
MAX_DELTA_PER_MERGE = {
"personnel_killed": 500, "personnel_wounded": 1000, "civilian_killed": 300, "civilian_wounded": 500,
"bases_destroyed": 5, "bases_damaged": 10,
"aircraft": 50, "warships": 10, "armor": 30, "vehicles": 100,
"drones": 50, "missiles": 200, "helicopters": 20, "submarines": 5, "carriers": 2,
"civilian_ships": 20, "airport_port": 10,
}
# 反击情绪 / 华尔街:合理区间,避免爬虫单条提取 0 或 100 导致指标归零或打满
RETALIATION_SMOOTH_WEIGHT = 0.6 # 当前值权重1 - 此值为新值权重,使更新平滑
RETALIATION_HISTORY_MAX_ROWS = 300 # 反击历史条数上限,供前端曲线与回放使用
WALL_STREET_TREND_MAX_ROWS = 200 # 趋势表保留最近条数,避免无限增长
VALUE_CLAMP_MIN, VALUE_CLAMP_MAX = 1, 99 # 0/100 视为异常,写入前夹在 [1,99]
def _clamp_delta(key: str, value: int) -> int:
"""单次增量上限,避免误提「累计」导致波动"""
cap = MAX_DELTA_PER_MERGE.get(key, 0)
if cap <= 0:
return max(0, value)
return max(0, min(value, cap))
def _ensure_tables(conn: sqlite3.Connection) -> None:
"""确保所需表存在(与 db.js 一致)"""
conn.execute("""
CREATE TABLE IF NOT EXISTS situation_update (
id TEXT PRIMARY KEY, timestamp TEXT NOT NULL, category TEXT NOT NULL,
summary TEXT NOT NULL, severity TEXT NOT NULL
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS combat_losses (
side TEXT PRIMARY KEY CHECK (side IN ('us', 'iran')),
bases_destroyed INTEGER NOT NULL, bases_damaged INTEGER NOT NULL,
personnel_killed INTEGER NOT NULL, personnel_wounded INTEGER NOT NULL,
aircraft INTEGER NOT NULL, warships INTEGER NOT NULL, armor INTEGER NOT NULL, vehicles INTEGER NOT NULL
)
""")
try:
conn.execute("ALTER TABLE combat_losses ADD COLUMN civilian_killed INTEGER NOT NULL DEFAULT 0")
except sqlite3.OperationalError:
pass
try:
conn.execute("ALTER TABLE combat_losses ADD COLUMN civilian_wounded INTEGER NOT NULL DEFAULT 0")
except sqlite3.OperationalError:
pass
try:
conn.execute("ALTER TABLE combat_losses ADD COLUMN updated_at TEXT DEFAULT (datetime('now'))")
except sqlite3.OperationalError:
pass
for col in ("drones", "missiles", "helicopters", "submarines", "tanks", "carriers", "civilian_ships", "airport_port"):
try:
conn.execute(f"ALTER TABLE combat_losses ADD COLUMN {col} INTEGER NOT NULL DEFAULT 0")
except sqlite3.OperationalError:
pass
conn.execute("CREATE TABLE IF NOT EXISTS wall_street_trend (id INTEGER PRIMARY KEY AUTOINCREMENT, time TEXT NOT NULL, value INTEGER NOT NULL)")
conn.execute("CREATE TABLE IF NOT EXISTS retaliation_current (id INTEGER PRIMARY KEY CHECK (id = 1), value INTEGER NOT NULL)")
conn.execute("CREATE TABLE IF NOT EXISTS retaliation_history (id INTEGER PRIMARY KEY AUTOINCREMENT, time TEXT NOT NULL, value INTEGER NOT NULL)")
conn.execute("CREATE TABLE IF NOT EXISTS situation (id INTEGER PRIMARY KEY CHECK (id = 1), data TEXT NOT NULL, updated_at TEXT NOT NULL)")
# 地图打击源与打击线(与 server/db.js 一致),供 getSituation mapData 与前端攻击动画使用
conn.execute("""
CREATE TABLE IF NOT EXISTS map_strike_source (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
lng REAL NOT NULL,
lat REAL NOT NULL
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS map_strike_line (
source_id TEXT NOT NULL,
target_lng REAL NOT NULL,
target_lat REAL NOT NULL,
target_name TEXT,
struck_at TEXT,
FOREIGN KEY (source_id) REFERENCES map_strike_source(id)
)
""")
try:
conn.execute("CREATE INDEX IF NOT EXISTS idx_map_strike_line_source ON map_strike_line(source_id)")
except sqlite3.OperationalError:
pass
try:
for col in ("struck_at",):
cur = conn.execute("PRAGMA table_info(map_strike_line)")
cols = [r[1] for r in cur.fetchall()]
if col not in cols:
conn.execute(f"ALTER TABLE map_strike_line ADD COLUMN {col} TEXT")
except sqlite3.OperationalError:
pass
conn.commit()
def merge(extracted: Dict[str, Any], db_path: Optional[str] = None) -> bool:
"""将提取数据合并到 DB返回是否有更新"""
path = db_path or DB_PATH
if not os.path.exists(path):
return False
conn = sqlite3.connect(path, timeout=10)
try:
_ensure_tables(conn)
updated = False
# situation_update
if "situation_update" in extracted:
u = extracted["situation_update"]
uid = f"ai_{hash(u.get('summary','')+u.get('timestamp','')) % 10**10}"
conn.execute(
"INSERT OR IGNORE INTO situation_update (id, timestamp, category, summary, severity) VALUES (?, ?, ?, ?, ?)",
(uid, u.get("timestamp", ""), u.get("category", "other"), u.get("summary", "")[:500], u.get("severity", "medium")),
)
if conn.total_changes > 0:
updated = True
# combat_losses统一按增量处理。AI 输出为本则报道的新增数,此处叠加到库内当前值,避免把「累计总数」当增量导致数据波动。
if "combat_losses_delta" in extracted:
for side, delta in extracted["combat_losses_delta"].items():
if side not in ("us", "iran"):
continue
try:
row = conn.execute(
"SELECT personnel_killed,personnel_wounded,civilian_killed,civilian_wounded,bases_destroyed,bases_damaged,aircraft,warships,armor,vehicles,drones,missiles,helicopters,submarines,tanks,carriers,civilian_ships,airport_port FROM combat_losses WHERE side = ?",
(side,),
).fetchone()
cur = {"personnel_killed": 0, "personnel_wounded": 0, "civilian_killed": 0, "civilian_wounded": 0,
"bases_destroyed": 0, "bases_damaged": 0, "aircraft": 0, "warships": 0, "armor": 0, "vehicles": 0,
"drones": 0, "missiles": 0, "helicopters": 0, "submarines": 0, "tanks": 0, "carriers": 0, "civilian_ships": 0, "airport_port": 0}
if row:
cur = {
"personnel_killed": row[0], "personnel_wounded": row[1], "civilian_killed": row[2] or 0,
"civilian_wounded": row[3] or 0, "bases_destroyed": row[4], "bases_damaged": row[5],
"aircraft": row[6], "warships": row[7], "armor": row[8], "vehicles": row[9],
"drones": row[10] if len(row) > 10 else 0, "missiles": row[11] if len(row) > 11 else 0,
"helicopters": row[12] if len(row) > 12 else 0, "submarines": row[13] if len(row) > 13 else 0,
"tanks": row[14] if len(row) > 14 else 0, "carriers": row[15] if len(row) > 15 else (row[14] if len(row) > 14 else 0),
"civilian_ships": row[16] if len(row) > 16 else 0, "airport_port": row[17] if len(row) > 17 else 0,
}
pk = max(0, (cur["personnel_killed"] or 0) + _clamp_delta("personnel_killed", delta.get("personnel_killed", 0)))
pw = max(0, (cur["personnel_wounded"] or 0) + _clamp_delta("personnel_wounded", delta.get("personnel_wounded", 0)))
ck = max(0, (cur["civilian_killed"] or 0) + _clamp_delta("civilian_killed", delta.get("civilian_killed", 0)))
cw = max(0, (cur["civilian_wounded"] or 0) + _clamp_delta("civilian_wounded", delta.get("civilian_wounded", 0)))
bd = max(0, (cur["bases_destroyed"] or 0) + _clamp_delta("bases_destroyed", delta.get("bases_destroyed", 0)))
bm = max(0, (cur["bases_damaged"] or 0) + _clamp_delta("bases_damaged", delta.get("bases_damaged", 0)))
ac = max(0, (cur["aircraft"] or 0) + _clamp_delta("aircraft", delta.get("aircraft", 0)))
ws = max(0, (cur["warships"] or 0) + _clamp_delta("warships", delta.get("warships", 0)))
ar = max(0, (cur["armor"] or 0) + _clamp_delta("armor", delta.get("armor", 0)))
vh = max(0, (cur["vehicles"] or 0) + _clamp_delta("vehicles", delta.get("vehicles", 0)))
dr = max(0, (cur["drones"] or 0) + _clamp_delta("drones", delta.get("drones", 0)))
ms = max(0, (cur["missiles"] or 0) + _clamp_delta("missiles", delta.get("missiles", 0)))
hp = max(0, (cur["helicopters"] or 0) + _clamp_delta("helicopters", delta.get("helicopters", 0)))
sb = max(0, (cur["submarines"] or 0) + _clamp_delta("submarines", delta.get("submarines", 0)))
cr = max(0, (cur["carriers"] or 0) + _clamp_delta("carriers", delta.get("carriers", 0)))
cs = max(0, (cur["civilian_ships"] or 0) + _clamp_delta("civilian_ships", delta.get("civilian_ships", 0)))
ap = max(0, (cur["airport_port"] or 0) + _clamp_delta("airport_port", delta.get("airport_port", 0)))
ts = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.000Z")
if row:
conn.execute(
"""UPDATE combat_losses SET personnel_killed=?, personnel_wounded=?, civilian_killed=?, civilian_wounded=?,
bases_destroyed=?, bases_damaged=?, aircraft=?, warships=?, armor=?, vehicles=?,
drones=?, missiles=?, helicopters=?, submarines=?, tanks=?, carriers=?, civilian_ships=?, airport_port=?, updated_at=? WHERE side=?""",
(pk, pw, ck, cw, bd, bm, ac, ws, ar, vh, dr, ms, hp, sb, cur.get("tanks", 0), cr, cs, ap, ts, side),
)
else:
conn.execute(
"""INSERT OR REPLACE INTO combat_losses (side, personnel_killed, personnel_wounded, civilian_killed, civilian_wounded,
bases_destroyed, bases_damaged, aircraft, warships, armor, vehicles, drones, missiles, helicopters, submarines, tanks, carriers, civilian_ships, airport_port, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(side, pk, pw, ck, cw, bd, bm, ac, ws, ar, vh, dr, ms, hp, sb, 0, cr, cs, ap, ts),
)
if conn.total_changes > 0:
updated = True
except Exception:
pass
# force_summary 增量:导弹消耗(看板「导弹消耗」「导弹库存」由 force_summary 提供)
if "force_summary_delta" in extracted:
for side, delta in extracted["force_summary_delta"].items():
if side not in ("us", "iran"):
continue
mc = delta.get("missile_consumed")
if mc is not None and isinstance(mc, (int, float)) and mc > 0:
mc = min(int(mc), 500)
try:
cur = conn.execute(
"UPDATE force_summary SET missile_consumed = missile_consumed + ?, missile_stock = max(0, missile_stock - ?) WHERE side = ?",
(mc, mc, side),
)
if cur.rowcount > 0:
updated = True
except Exception:
pass
# retaliation平滑更新避免单条新闻 0/100 导致指标归零或打满
if "retaliation" in extracted:
r = extracted["retaliation"]
raw = max(VALUE_CLAMP_MIN, min(VALUE_CLAMP_MAX, int(r.get("value", 50))))
row = conn.execute("SELECT value FROM retaliation_current WHERE id = 1").fetchone()
current = int(row[0]) if row else 50
current = max(VALUE_CLAMP_MIN, min(VALUE_CLAMP_MAX, current))
new_val = round(
RETALIATION_SMOOTH_WEIGHT * current + (1 - RETALIATION_SMOOTH_WEIGHT) * raw
)
new_val = max(VALUE_CLAMP_MIN, min(VALUE_CLAMP_MAX, new_val))
ts = (r.get("time") or datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.000Z"))[:25]
conn.execute("INSERT OR REPLACE INTO retaliation_current (id, value) VALUES (1, ?)", (new_val,))
conn.execute("INSERT INTO retaliation_history (time, value) VALUES (?, ?)", (ts, new_val))
n_ret = conn.execute("SELECT COUNT(*) FROM retaliation_history").fetchone()[0]
if n_ret > RETALIATION_HISTORY_MAX_ROWS:
conn.execute(
"DELETE FROM retaliation_history WHERE id IN (SELECT id FROM retaliation_history ORDER BY time ASC LIMIT ?)",
(n_ret - RETALIATION_HISTORY_MAX_ROWS,),
)
updated = True
# wall_street_trend限幅后写入并保留最近 N 条避免表无限增长
if "wall_street" in extracted:
w = extracted["wall_street"]
raw = int(w.get("value", 50))
val = max(VALUE_CLAMP_MIN, min(VALUE_CLAMP_MAX, raw))
ts = (w.get("time") or datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.000Z"))[:25]
conn.execute("INSERT INTO wall_street_trend (time, value) VALUES (?, ?)", (ts, val))
n = conn.execute("SELECT COUNT(*) FROM wall_street_trend").fetchone()[0]
if n > WALL_STREET_TREND_MAX_ROWS:
conn.execute(
"DELETE FROM wall_street_trend WHERE id IN (SELECT id FROM wall_street_trend ORDER BY time ASC LIMIT ?)",
(n - WALL_STREET_TREND_MAX_ROWS,),
)
updated = True
# key_location更新双方攻击地点美军基地被打击 side=us伊朗设施被打击 side=iran的 status/damage_level/attacked_at
event_time = extracted.get("_event_time") or datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.000Z")
if "key_location_updates" in extracted:
try:
for u in extracted["key_location_updates"]:
kw_raw = (u.get("name_keywords") or "").strip()
if not kw_raw:
continue
kw = [k.strip() for k in kw_raw.replace("|", " ").split() if k.strip()]
side = u.get("side")
status = (u.get("status") or "attacked")[:20]
dmg = u.get("damage_level", 2)
if not kw or side not in ("us", "iran"):
continue
attacked_at = (u.get("attacked_at") or event_time)[:25]
conditions = " OR ".join("name LIKE ?" for _ in kw)
params_with_at = [status, dmg, attacked_at, side] + [f"%{k}%" for k in kw]
try:
cur = conn.execute(
f"UPDATE key_location SET status=?, damage_level=?, attacked_at=? WHERE side=? AND ({conditions})",
params_with_at,
)
except sqlite3.OperationalError:
params_no_at = [status, dmg, side] + [f"%{k}%" for k in kw]
cur = conn.execute(
f"UPDATE key_location SET status=?, damage_level=? WHERE side=? AND ({conditions})",
params_no_at,
)
if cur.rowcount > 0:
updated = True
except Exception:
pass
# map_strike_source打击源与前端 mapData.strikeSources 一致),爬虫可补充或覆盖
if "map_strike_sources" in extracted:
try:
for s in extracted["map_strike_sources"]:
sid = (s.get("id") or "").strip()
name = (s.get("name") or "").strip() or sid
lng = float(s.get("lng", 0))
lat = float(s.get("lat", 0))
if sid:
conn.execute(
"INSERT OR REPLACE INTO map_strike_source (id, name, lng, lat) VALUES (?, ?, ?, ?)",
(sid, name[:200], lng, lat),
)
if conn.total_changes > 0:
updated = True
except Exception:
pass
# map_strike_lines打击线与前端 mapData.strikeLines 一致),爬虫可追加新打击,便于前端追加攻击动画
if "map_strike_lines" in extracted:
try:
for line in extracted["map_strike_lines"]:
source_id = (line.get("source_id") or "").strip()
target_lng = float(line.get("target_lng", 0))
target_lat = float(line.get("target_lat", 0))
target_name = (line.get("target_name") or "").strip()[:200] or None
struck_at = (line.get("struck_at") or "").strip() or None
if source_id:
conn.execute(
"INSERT INTO map_strike_line (source_id, target_lng, target_lat, target_name, struck_at) VALUES (?, ?, ?, ?, ?)",
(source_id, target_lng, target_lat, target_name, struck_at),
)
if conn.total_changes > 0:
updated = True
except Exception:
pass
if updated:
conn.execute("INSERT OR REPLACE INTO situation (id, data, updated_at) VALUES (1, '{}', ?)", (datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.000Z"),))
conn.commit()
return updated
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()