This commit is contained in:
Daniel
2026-03-04 16:48:17 +08:00
parent 64f4c438c3
commit 26938449f0
34 changed files with 956 additions and 500 deletions

View File

@@ -14,16 +14,17 @@
**数据偏老原因**:未传 `timespan``sort=datedesc`API 返回 3 个月内“最相关”文章,不保证最新。
### 2. RSS 新闻 (situation_update) — 主事件脉络来源
### 2. RSS 新闻 → 看板实时数据(主输出)+ 事件脉络
| 项目 | 说明 |
|------|------|
| 源 | 多国主流媒体:美(Reuters/NYT)、英(BBC)、法(France 24)、俄(TASS/RT)、中(Xinhua/CGTN)、伊(Press TV)、卡塔尔(Al Jazeera) |
| **主输出** | **看板实时数据**战损combat_losses、据点状态key_location、冲突事件gdelt_events、统计conflict_stats供前端战损/基地/地图等面板展示。 |
| 辅助输出 | 事件脉络situation_update时间线摘要非主展示目标。 |
| 源 | 多国主流媒体:美/英/法/俄/中/伊/卡塔尔等(见 `config.RSS_FEEDS` |
| 过滤 | 标题/摘要需含 `KEYWORDS` 之一iran、usa、strike、military 等) |
| 更新 | 爬虫 45 秒拉一次(`RSS_INTERVAL_SEC`),优先保证事件脉络 |
| 优先级 | 启动时先拉 RSS再拉 GDELT |
| 更新 | 爬虫 `RSS_INTERVAL_SEC` 拉取;每 `BACKFILL_CYCLES` 轮会从近期事件回填一次战损/据点,保证面板数据与最新内容一致。 |
**GDELT 无法访问时**:设置 `GDELT_DISABLED=1`,仅用 RSS 新闻即可维持事件脉络。部分境外源可能受网络限制
**GDELT 无法访问时**:设置 `GDELT_DISABLED=1`,仅用 RSS部分境外源可能需代理
### 3. AI 新闻清洗与分类(可选)
@@ -34,7 +35,7 @@
---
**事件脉络可实时更新**:爬虫抓取后 → 写入 SQLite → 调用 Node 通知 → WebSocket 广播 → 前端自动刷新
**看板实时数据更新**:爬虫抓取 → 提取战损/据点等 → 写入 combat_losses、key_location 等 → 调用 Node 通知 → WebSocket 广播 → 前端战损/基地/地图等面板刷新。事件脉络(时间线)为同一流水线的辅助输出
## 依赖
@@ -141,6 +142,7 @@ npm run crawler:test:extraction # 规则/db_merge 测试
| **地图冲突点** (conflictEvents) | gdelt_events 或 RSS→gdelt 回填 | ✅ 是 | GDELT 或 GDELT 禁用时由 situation_update 同步到 gdelt_events |
| **战损/装备毁伤** (combatLosses) | combat_losses | ⚠️ 有条件 | 仅当 AI/规则从新闻中提取到数字如「2 名美军死亡」merge 才写入增量 |
| **基地/地点状态** (keyLocations) | key_location | ⚠️ 有条件 | 仅当提取到 key_location_updates如某基地遭袭时更新 |
| **地图打击/攻击动画** (mapData.strikeSources, strikeLines) | map_strike_source, map_strike_line | ⚠️ 有条件 | 仅当提取到 map_strike_sources / map_strike_lines 时写入;格式见下「地图打击数据」 |
| **力量摘要/指数/资产** (summary, powerIndex, assets) | force_summary, power_index, force_asset | ❌ 否 | 仅 seed 初始化,爬虫不写 |
| **华尔街/报复情绪** (wallStreet, retaliation) | wall_street_trend, retaliation_* | ⚠️ 有条件 | 仅当提取器输出对应字段时更新 |
@@ -255,6 +257,32 @@ npm run crawler:test
---
## 数据流与 AI 自检
**完整链路**RSS 抓取 → 关键词过滤 → 翻译/清洗 → 去重news_content→ 写 situation_update → 正文抓取(可选)→ **AI 提取**(战损/基地等)→ db_merge 写 combat_losses/key_location 等 → POST /api/crawler/notify → Node 重载并广播。
| 环节 | 说明 | 自检 |
|------|------|------|
| 抓取 | `scrapers/rss_scraper.fetch_all()`,按 KEYWORDS 过滤 | `npm run crawler:test` 看条数 |
| 去重 | `news_storage.save_and_dedup()`content_hash 落库 news_content | 查 `news_content` 表条数 |
| 事件脉络 | `db_writer.write_updates()` 写 situation_update与 pipeline 使用同一 db_path | 查 `situation_update` 表 |
| AI 提取 | 战损/基地等:**有 DASHSCOPE_API_KEY 用通义****否则 CLEANER_AI_DISABLED=1 用规则**,否则用 **Ollama**extractor_ai | 见下 |
| 分类/严重度 | 每条 RSS 的 category/severity**PARSER_AI_DISABLED=1 用规则**,否则 DashScope 或 Ollama | 无 AI 时设 `PARSER_AI_DISABLED=1` 可正常跑 |
**如何保证「面板实时数据」有更新**(战损、据点等):
- **推荐**:设 `CLEANER_AI_DISABLED=1` → 使用 `extractor_rules`(纯规则),无需 Ollama/通义,即可从新闻中提取战损/基地并写入 combat_losses、key_location。
- 或设 `DASHSCOPE_API_KEY` → 用通义做更细的提取。
- 否则用 `extractor_ai`(需本机 `ollama run llama3.1`),未起则提取静默失败、面板数字不更新。
- 服务会每 `BACKFILL_CYCLES` 轮(默认 2 轮)从近期事件再跑一次提取并合并,保证战损/据点与最新内容一致。
**常见 bug 与修复**
- **事件脉络有、战损/基地不更新**:多为 AI 未跑通Ollama 未起且未设 DashScope、未设 CLEANER_AI_DISABLED。可设 `CLEANER_AI_DISABLED=1` 用规则提取,或起 Ollama / 配置 DashScope。
- **多 DB 路径不一致**pipeline 已统一 `db_path``write_updates``save_and_dedup``merge` 均使用同一 path`config.DB_PATH`)。
---
## 优化后验证效果示例
以下为「正文抓取 + AI 精确提取 + 增量与地点更新」优化后,单条新闻从输入到前端展示的完整示例,便于对照验证。
@@ -321,6 +349,15 @@ print('key_location_updates:', out.get('key_location_updates'))
期望:`combat_losses_delta.us` 含 personnel_killed=2、personnel_wounded=14、aircraft=1 等增量;`key_location_updates` 含阿萨德 side=us 等条目。
### 地图打击数据(与前端攻击动画统一格式)
爬虫/AI 若输出以下字段,`db_merge` 会写入 `map_strike_source``map_strike_line``GET /api/situation``mapData.strikeSources` / `mapData.strikeLines` 会更新,前端可直接追加打击线与飞行动画。
- **map_strike_sources**(可选):`[{ "id": "israel"|"lincoln"|"ford", "name": "显示名", "lng": 经度, "lat": 纬度 }]`,与 seed 中打击源 id 一致时可覆盖位置。
- **map_strike_lines**(可选):`[{ "source_id": "israel"|"lincoln"|"ford", "target_lng", "target_lat", "target_name": "目标名", "struck_at": "ISO 时间" }]`,每条追加一条打击线(不删已有),便于按时间回放。
示例:`{ "map_strike_lines": [{ "source_id": "israel", "target_lng": 51.916, "target_lat": 33.666, "target_name": "纳坦兹", "struck_at": "2026-03-01T02:04:00.000Z" }] }`
---
## 冲突强度 (impact_score)

View File

@@ -1,7 +1,12 @@
# -*- coding: utf-8 -*-
"""
将 AI 提取的结构化数据合并到 SQLite
与 panel schema 及 situationData.getSituation 对齐,支持回放
与 panel schema 及 situationData.getSituation 对齐,支持回放
地图打击数据(与前端攻击动画一致):
- map_strike_sources: [{ "id": "israel"|"lincoln"|"ford", "name": "显示名", "lng", "lat" }] 写入 map_strike_source
- map_strike_lines: [{ "source_id", "target_lng", "target_lat", "target_name?", "struck_at?" }] 追加到 map_strike_line
爬虫/AI 可按此格式输出,落库后 GET /api/situation 的 mapData.strikeSources/strikeLines 会更新,前端直接追加攻击动画。
"""
import os
import sqlite3
@@ -67,6 +72,37 @@ def _ensure_tables(conn: sqlite3.Connection) -> None:
conn.execute("CREATE TABLE IF NOT EXISTS retaliation_current (id INTEGER PRIMARY KEY CHECK (id = 1), value INTEGER NOT NULL)")
conn.execute("CREATE TABLE IF NOT EXISTS retaliation_history (id INTEGER PRIMARY KEY AUTOINCREMENT, time TEXT NOT NULL, value INTEGER NOT NULL)")
conn.execute("CREATE TABLE IF NOT EXISTS situation (id INTEGER PRIMARY KEY CHECK (id = 1), data TEXT NOT NULL, updated_at TEXT NOT NULL)")
# 地图打击源与打击线(与 server/db.js 一致),供 getSituation mapData 与前端攻击动画使用
conn.execute("""
CREATE TABLE IF NOT EXISTS map_strike_source (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
lng REAL NOT NULL,
lat REAL NOT NULL
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS map_strike_line (
source_id TEXT NOT NULL,
target_lng REAL NOT NULL,
target_lat REAL NOT NULL,
target_name TEXT,
struck_at TEXT,
FOREIGN KEY (source_id) REFERENCES map_strike_source(id)
)
""")
try:
conn.execute("CREATE INDEX IF NOT EXISTS idx_map_strike_line_source ON map_strike_line(source_id)")
except sqlite3.OperationalError:
pass
try:
for col in ("struck_at",):
cur = conn.execute("PRAGMA table_info(map_strike_line)")
cols = [r[1] for r in cur.fetchall()]
if col not in cols:
conn.execute(f"ALTER TABLE map_strike_line ADD COLUMN {col} TEXT")
except sqlite3.OperationalError:
pass
conn.commit()
@@ -183,6 +219,41 @@ def merge(extracted: Dict[str, Any], db_path: Optional[str] = None) -> bool:
updated = True
except Exception:
pass
# map_strike_source打击源与前端 mapData.strikeSources 一致),爬虫可补充或覆盖
if "map_strike_sources" in extracted:
try:
for s in extracted["map_strike_sources"]:
sid = (s.get("id") or "").strip()
name = (s.get("name") or "").strip() or sid
lng = float(s.get("lng", 0))
lat = float(s.get("lat", 0))
if sid:
conn.execute(
"INSERT OR REPLACE INTO map_strike_source (id, name, lng, lat) VALUES (?, ?, ?, ?)",
(sid, name[:200], lng, lat),
)
if conn.total_changes > 0:
updated = True
except Exception:
pass
# map_strike_lines打击线与前端 mapData.strikeLines 一致),爬虫可追加新打击,便于前端追加攻击动画
if "map_strike_lines" in extracted:
try:
for line in extracted["map_strike_lines"]:
source_id = (line.get("source_id") or "").strip()
target_lng = float(line.get("target_lng", 0))
target_lat = float(line.get("target_lat", 0))
target_name = (line.get("target_name") or "").strip()[:200] or None
struck_at = (line.get("struck_at") or "").strip() or None
if source_id:
conn.execute(
"INSERT INTO map_strike_line (source_id, target_lng, target_lat, target_name, struck_at) VALUES (?, ?, ?, ?, ?)",
(source_id, target_lng, target_lat, target_name, struck_at),
)
if conn.total_changes > 0:
updated = True
except Exception:
pass
if updated:
conn.execute("INSERT OR REPLACE INTO situation (id, data, updated_at) VALUES (1, '{}', ?)", (datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.000Z"),))
conn.commit()

View File

@@ -4,6 +4,7 @@ import sqlite3
import hashlib
import os
from datetime import datetime, timezone
from typing import Optional
from config import DB_PATH
@@ -73,14 +74,29 @@ def touch_situation_updated_at(conn: sqlite3.Connection) -> None:
conn.commit()
def write_updates(updates: list[dict]) -> int:
def touch_situation_updated_at_path(db_path: Optional[str] = None) -> bool:
"""仅更新 situation.updated_at 为当前时间(每次爬虫运行都调用,便于前端显示「最后抓取时间」)。返回是否成功。"""
path = db_path or DB_PATH
if not os.path.exists(path):
return False
conn = sqlite3.connect(path, timeout=10)
try:
touch_situation_updated_at(conn)
return True
finally:
conn.close()
def write_updates(updates: list[dict], db_path: Optional[str] = None) -> int:
"""
updates: [{"title","summary","url","published","category","severity"}, ...]
db_path: 与 pipeline 一致,缺省用 config.DB_PATH
返回新增条数。
"""
if not os.path.exists(DB_PATH):
path = db_path or DB_PATH
if not os.path.exists(path):
return 0
conn = sqlite3.connect(DB_PATH, timeout=10)
conn = sqlite3.connect(path, timeout=10)
try:
count = 0
for u in updates:

View File

@@ -8,6 +8,7 @@ from datetime import datetime, timezone
from typing import Callable, Optional, Tuple
from config import DB_PATH, API_BASE
from db_writer import touch_situation_updated_at_path
def _notify_api(api_base: str) -> bool:
@@ -172,15 +173,18 @@ def run_full_pipeline(
except Exception as e:
print(f" [warn] 正文抓取: {e}")
# 4. 映射到前端库字段并更新表
n_panel = write_updates(new_items) if new_items else 0
# 4. 映射到前端库字段并更新表(与去重/AI 使用同一 db path
n_panel = write_updates(new_items, db_path=path) if new_items else 0
if new_items:
_extract_and_merge(new_items, path)
# 5. 通知(有新增时才通知;可选:先执行外部逻辑如 GDELT 回填,再通知
# 4.5 每次运行都刷新 situation.updated_at便于前端显示「最后抓取时间」否则只有新增条目时才更新数据会一直停在旧日期
touch_situation_updated_at_path(db_path=path)
# 5. 通知(每次运行都通知,让 API 重载并广播最新 lastUpdated
if on_notify:
on_notify()
if notify and (n_panel > 0 or n_news > 0):
if notify:
_notify_api(base)
return len(items), n_news, n_panel

View File

@@ -297,8 +297,43 @@ def _rss_to_gdelt_fallback() -> None:
LAST_FETCH = {"items": 0, "inserted": 0, "error": None}
def _refresh_panel_data() -> int:
"""从近期事件重新提取并合并战损/据点等面板实时数据,不依赖本轮是否有新 RSS。返回合并条数。"""
if not os.path.exists(DB_PATH):
return 0
try:
from db_merge import merge
use_dashscope = bool(os.environ.get("DASHSCOPE_API_KEY", "").strip())
if use_dashscope:
from extractor_dashscope import extract_from_news
elif os.environ.get("CLEANER_AI_DISABLED", "0") == "1":
from extractor_rules import extract_from_news
else:
from extractor_ai import extract_from_news
conn = sqlite3.connect(DB_PATH, timeout=10)
rows = conn.execute(
"SELECT id, timestamp, category, summary FROM situation_update ORDER BY timestamp DESC LIMIT 50"
).fetchall()
conn.close()
merged = 0
for r in rows:
uid, ts, cat, summary = r
text = ((cat or "") + " " + (summary or "")).strip()
if len(text) < 20:
continue
try:
extracted = extract_from_news(text, timestamp=ts)
if extracted and merge(extracted, db_path=DB_PATH):
merged += 1
except Exception:
pass
return merged
except Exception:
return 0
def fetch_news() -> None:
"""执行完整写库流水线GDELT 禁用时用 RSS 回填 gdelt_events,再通知 Node"""
"""执行完整写库流水线;产出看板实时数据(战损、据点、冲突事件)+ 事件脉络。GDELT 禁用时用 RSS 回填 gdelt_events。"""
try:
from pipeline import run_full_pipeline
LAST_FETCH["error"] = None
@@ -314,7 +349,7 @@ def fetch_news() -> None:
_rss_to_gdelt_fallback()
_notify_node()
ts = datetime.now().strftime("%H:%M:%S")
print(f"[{ts}] RSS 抓取 {n_fetched} 条,去重新增 {n_news}资讯,写入事件脉络 {n_panel}")
print(f"[{ts}] 抓取 {n_fetched} 条,去重新增 {n_news},写脉络 {n_panel} → 面板实时数据(战损/据点)已由本批提取更新")
if n_fetched == 0:
print(f"[{ts}] 0 条检查网络、RSS 源或 KEYWORDS 过滤)")
except Exception as e:
@@ -322,6 +357,10 @@ def fetch_news() -> None:
print(f"[{datetime.now().strftime('%H:%M:%S')}] 新闻抓取失败: {e}")
# 每 N 轮做一次「从近期事件回填面板实时数据」,保证战损/据点等与最新内容一致
BACKFILL_CYCLES = int(os.environ.get("BACKFILL_CYCLES", "2"))
_cycle_count = 0
# ==========================
# 定时任务asyncio 后台任务,避免 APScheduler executor 关闭竞态)
# ==========================
@@ -329,11 +368,20 @@ _bg_task: Optional[asyncio.Task] = None
async def _periodic_fetch() -> None:
global _cycle_count
loop = asyncio.get_event_loop()
while True:
try:
await loop.run_in_executor(None, fetch_news)
await loop.run_in_executor(None, fetch_gdelt_events)
_cycle_count += 1
if _cycle_count >= BACKFILL_CYCLES:
_cycle_count = 0
merged = _refresh_panel_data()
if merged > 0:
_notify_node()
ts = datetime.now().strftime("%H:%M:%S")
print(f"[{ts}] 面板实时数据回填:从近期事件合并 {merged} 条(战损/据点)")
except asyncio.CancelledError:
break
except Exception as e:

View File

@@ -88,7 +88,6 @@ def fetch_all() -> list[dict]:
if key in seen:
continue
seen.add(key)
# 写入 DB 的 schema 不包含 source可后续扩展
items.append({k: v for k, v in item.items() if k != "source"})
items.append(item)
return items