diff --git a/crawler/README.md b/crawler/README.md index 0280af6..4a4991d 100644 --- a/crawler/README.md +++ b/crawler/README.md @@ -54,10 +54,20 @@ pip install -r requirements.txt **事件脉络不更新时**:多半是未启动 `npm run gdelt`。只跑 `npm run api` 时,事件脉络会显示空或仅有缓存。 +## 写库流水线(与 server/README 第五节一致) + +RSS 与主入口均走统一流水线 `pipeline.run_full_pipeline`: + +1. **抓取** → 2. **AI 清洗**(标题/摘要/分类)→ 3. **去重**(news_content.content_hash)→ 4. **映射到前端库字段**(situation_update、combat_losses、key_location 等)→ 5. **更新表** → 6. **有新增时 POST /api/crawler/notify** + +- `npm run crawler`(main.py)与 `npm run gdelt`(realtime_conflict_service)的 RSS 分支都调用该流水线。 +- 实现见 `crawler/pipeline.py`。 + ## 数据流 ``` GDELT API → 抓取(60s) → SQLite (gdelt_events, conflict_stats) → POST /api/crawler/notify +RSS → 抓取 → 清洗 → 去重 → 写 news_content / situation_update / 战损等 → POST /api/crawler/notify ↓ Node 更新 situation.updated_at + WebSocket 广播 ↓ @@ -79,6 +89,83 @@ GDELT API → 抓取(60s) → SQLite (gdelt_events, conflict_stats) → POST /ap - `OLLAMA_MODEL`: AI 分类模型,默认 `llama3.1` - `PARSER_AI_DISABLED`: 设为 `1` 则禁用 AI 分类,仅用规则 - `CLEANER_AI_DISABLED`: 设为 `1` 则禁用 AI 清洗,仅用规则截断 +- `FETCH_FULL_ARTICLE`: 设为 `0` 则不再抓取正文,仅用标题+摘要做 AI 提取(默认 `1` 抓取正文) +- `ARTICLE_FETCH_LIMIT`: 每轮为多少条新资讯抓取正文,默认 10 +- `ARTICLE_FETCH_TIMEOUT`: 单篇正文请求超时(秒),默认 12 +- `ARTICLE_MAX_BODY_CHARS`: 正文最大字符数,默认 6000 +- `EXTRACT_TEXT_MAX_LEN`: 送入 AI 提取的原文最大长度,默认 4000 + +**增量与地点**:战损一律按**增量**处理——AI 只填本则报道的「本次/此次」新增数,不填累计总数;合并时与库内当前值叠加。双方攻击地点通过 `key_location_updates` 更新(美军基地被打击 side=us,伊朗设施被打击 side=iran),会写入 `key_location` 的 status/damage_level。 + +--- + +## 优化后验证效果示例 + +以下为「正文抓取 + AI 精确提取 + 增量与地点更新」优化后,单条新闻从输入到前端展示的完整示例,便于对照验证。 + +### 1. 示例输入(新闻摘要/全文片段) + +``` +伊朗向伊拉克阿萨德空军基地发射 12 枚弹道导弹,造成此次袭击中 2 名美军人员死亡、14 人受伤, +另有 1 架战机在跑道受损。乌代德基地未遭直接命中。同日以色列对伊朗伊斯法罕一处设施发动打击。 +``` + +### 2. AI 提取输出(增量 + 攻击地点) + +```json +{ + "summary": "伊朗导弹袭击伊拉克阿萨德基地致美军 2 死 14 伤,1 架战机受损;以军打击伊斯法罕。", + "category": "alert", + "severity": "high", + "us_personnel_killed": 2, + "us_personnel_wounded": 14, + "us_aircraft": 1, + "us_bases_damaged": 1, + "key_location_updates": [ + { "name_keywords": "阿萨德|asad|al-asad", "side": "us", "status": "attacked", "damage_level": 2 }, + { "name_keywords": "伊斯法罕|isfahan", "side": "iran", "status": "attacked", "damage_level": 1 } + ] +} +``` + +说明:战损为**本则报道的新增数**(此次 2 死、14 伤、1 架战机),不是累计总数;地点为双方遭袭设施(美军基地 side=us,伊朗设施 side=iran)。 + +### 3. 合并后数据库变化 + +| 表/字段 | 合并前 | 本则增量 | 合并后 | +|--------|--------|----------|--------| +| combat_losses.us.personnel_killed | 127 | +2 | 129 | +| combat_losses.us.personnel_wounded | 384 | +14 | 398 | +| combat_losses.us.aircraft | 2 | +1 | 3 | +| combat_losses.us.bases_damaged | 27 | +1 | 28 | +| key_location(name 含「阿萨德」) | status=operational | — | status=attacked, damage_level=2 | +| key_location(name 含「伊斯法罕」) | status=operational | — | status=attacked, damage_level=1 | + +若 AI 误提「累计 2847 人丧生」并填成 personnel_killed=2847,单次合并会被上限截断(如最多 +500),避免一次写入导致数据剧增。 + +### 4. 前端验证效果 + +- **事件脉络**:出现一条新条目,summary 为上述 1–2 句概括,category=alert、severity=high。 +- **装备毁伤面板**:美军「阵亡」+2、「受伤」+14、「战机」+1;基地毁/损数字随 bases_damaged +1 更新。 +- **地图**:阿萨德基地、伊斯法罕对应点位显示为「遭袭」状态(脉冲/标色随现有地图逻辑)。 +- **API**:`GET /api/situation` 中 `usForces.combatLosses`、`usForces.keyLocations`(含 status/damage_level)为更新后值;`lastUpdated` 为合并后时间。 + +### 5. 快速自测命令 + +```bash +# 仅测提取逻辑(不写库):用示例文本调 AI 提取,看是否得到增量 + key_location_updates +cd crawler && python3 -c " +from extractor_ai import extract_from_news +text = '''伊朗向伊拉克阿萨德空军基地发射导弹,此次袭击造成 2 名美军死亡、14 人受伤,1 架战机受损。''' +out = extract_from_news(text) +print('combat_losses_delta:', out.get('combat_losses_delta')) +print('key_location_updates:', out.get('key_location_updates')) +" +``` + +期望:`combat_losses_delta.us` 含 personnel_killed=2、personnel_wounded=14、aircraft=1 等增量;`key_location_updates` 含阿萨德 side=us 等条目。 + +--- ## 冲突强度 (impact_score) @@ -93,6 +180,75 @@ GDELT API → 抓取(60s) → SQLite (gdelt_events, conflict_stats) → POST /ap - `GET http://localhost:8000/events`:返回事件列表与冲突统计(Python 服务直连) - `GET http://localhost:3001/api/events`:从 Node 读取(推荐,含 WebSocket 同步) +## 本地验证链路 + +按下面任选一种方式,确认「抓取 → 清洗 → 去重 → 映射 → 写表 → 通知」整条链路正常。 + +### 方式一:最小验证(不启动前端) + +1. **启动 API(必须)** + ```bash + npm run api + ``` + 保持运行,默认 `http://localhost:3001`。 + +2. **安装爬虫依赖并跑一轮流水线** + ```bash + cd crawler && pip install -r requirements.txt + python -c " + from pipeline import run_full_pipeline + from config import DB_PATH, API_BASE + n_fetched, n_news, n_panel = run_full_pipeline(db_path=DB_PATH, api_base=API_BASE, translate=True, notify=True) + print('抓取:', n_fetched, '去重新增:', n_news, '面板写入:', n_panel) + " + ``` + - 有网络且有关键词命中时,应看到非零数字;无网络或全被过滤则为 `0 0 0`。 + - 若报错 `module 'socket' has no attribute 'settimeout'`,已修复为 `setdefaulttimeout`,请拉取最新代码。 + +3. **查库确认** + ```bash + sqlite3 server/data.db "SELECT COUNT(*) FROM situation_update; SELECT COUNT(*) FROM news_content;" + ``` + 或浏览器打开 `http://localhost:3001/api/db/dashboard`,看 `situation_update`、`news_content` 是否有数据。 + +4. **确认态势接口** + ```bash + curl -s http://localhost:3001/api/situation | head -c 500 + ``` + 应包含 `lastUpdated`、`recentUpdates` 等。 + +### 方式二:用现有验证脚本(推荐) + +1. 终端 1:`npm run api` +2. 终端 2(可选):`npm run gdelt`(会定时跑 RSS + GDELT) +3. 执行验证脚本: + ```bash + ./scripts/verify-pipeline.sh + ``` + 若爬虫未启动想一并测爬虫,可: + ```bash + ./scripts/verify-pipeline.sh --start-crawler + ``` + 脚本会检查:API 健康、态势数据、爬虫状态、资讯表、战损字段、通知接口。 + +### 方式三:只测 RSS 抓取(不写库) + +```bash +npm run crawler:test +``` +输出为「RSS 抓取: N 条」。0 条时检查网络或 `config.py` 里 `RSS_FEEDS` / `KEYWORDS`。 + +### 常见问题 + +| 现象 | 可能原因 | +|------|----------| +| 抓取 0 条 | 网络不通、RSS 被墙、关键词无一命中 | +| `situation_update` 为空 | 去重后无新增,或未跑流水线(只跑了 `fetch_all` 未跑 `run_full_pipeline`) | +| 前端事件脉络不刷新 | 未启动 `npm run api` 或 WebSocket 未连上(需通过 Vite 代理访问前端) | +| 翻译/AI 清洗很慢或报错 | 设 `TRANSLATE_DISABLED=1` 或 `CLEANER_AI_DISABLED=1` 可跳过,用规则兜底 | + +--- + ## 故障排查 | 现象 | 可能原因 | 排查 | diff --git a/crawler/__pycache__/config.cpython-311.pyc b/crawler/__pycache__/config.cpython-311.pyc index a9256d8..f1aa1f4 100644 Binary files a/crawler/__pycache__/config.cpython-311.pyc and b/crawler/__pycache__/config.cpython-311.pyc differ diff --git a/crawler/__pycache__/config.cpython-39.pyc b/crawler/__pycache__/config.cpython-39.pyc index d85e50a..2127899 100644 Binary files a/crawler/__pycache__/config.cpython-39.pyc and b/crawler/__pycache__/config.cpython-39.pyc differ diff --git a/crawler/__pycache__/pipeline.cpython-311.pyc b/crawler/__pycache__/pipeline.cpython-311.pyc new file mode 100644 index 0000000..d06eb08 Binary files /dev/null and b/crawler/__pycache__/pipeline.cpython-311.pyc differ diff --git a/crawler/__pycache__/pipeline.cpython-39.pyc b/crawler/__pycache__/pipeline.cpython-39.pyc new file mode 100644 index 0000000..ab5dd1d Binary files /dev/null and b/crawler/__pycache__/pipeline.cpython-39.pyc differ diff --git a/crawler/__pycache__/realtime_conflict_service.cpython-39.pyc b/crawler/__pycache__/realtime_conflict_service.cpython-39.pyc index fc861f5..f3a0d8c 100644 Binary files a/crawler/__pycache__/realtime_conflict_service.cpython-39.pyc and b/crawler/__pycache__/realtime_conflict_service.cpython-39.pyc differ diff --git a/crawler/article_fetcher.py b/crawler/article_fetcher.py new file mode 100644 index 0000000..44ef3bd --- /dev/null +++ b/crawler/article_fetcher.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- +""" +从文章 URL 抓取正文,供 AI 提取精确数据使用。 +RSS 仅提供标题和短摘要,正文可提供伤亡、番号、地点等具体数字与事实。 +""" +import os +import re +from typing import Optional + +# 单页超时(秒) +FETCH_TIMEOUT = int(os.environ.get("ARTICLE_FETCH_TIMEOUT", "12")) +# 正文最大字符数,避免超长输入 +MAX_BODY_CHARS = int(os.environ.get("ARTICLE_MAX_BODY_CHARS", "6000")) +# 是否启用正文抓取(设为 0 则仅用标题+摘要) +FETCH_FULL_ARTICLE = os.environ.get("FETCH_FULL_ARTICLE", "1") == "1" + + +def _strip_html(html: str) -> str: + """简单去除 HTML 标签与多余空白""" + if not html: + return "" + text = re.sub(r"]*>[\s\S]*?", " ", html, flags=re.I) + text = re.sub(r"]*>[\s\S]*?", " ", text, flags=re.I) + text = re.sub(r"<[^>]+>", " ", text) + text = re.sub(r"\s+", " ", text).strip() + return text + + +def fetch_article_body(url: str, timeout: int = FETCH_TIMEOUT) -> Optional[str]: + """ + 请求文章 URL,提取正文纯文本。失败或非 HTML 返回 None。 + 优先用 BeautifulSoup 取 main/article 或 body,否则退化为正则去标签。 + """ + if not url or not url.strip().startswith("http"): + return None + try: + import requests + headers = {"User-Agent": "US-Iran-Dashboard/1.0 (News Aggregator)"} + # 不跟随代理,避免墙内超时 + proxies = {"http": None, "https": None} if os.environ.get("CRAWLER_USE_PROXY") != "1" else None + r = requests.get(url, headers=headers, timeout=timeout, proxies=proxies) + r.raise_for_status() + ct = (r.headers.get("Content-Type") or "").lower() + if "html" not in ct and "xml" not in ct: + return None + html = r.text + if not html or len(html) < 200: + return None + try: + from bs4 import BeautifulSoup + except ImportError: + return _strip_html(html)[:MAX_BODY_CHARS] + try: + soup = BeautifulSoup(html, "html.parser") + for tag in ("article", "main", "[role='main']", ".article-body", ".post-content", ".entry-content", ".content"): + if tag.startswith((".", "[")): + node = soup.select_one(tag) + else: + node = soup.find(tag) + if node: + body = node.get_text(separator=" ", strip=True) + if len(body) > 300: + return _strip_html(body)[:MAX_BODY_CHARS] + body = soup.body.get_text(separator=" ", strip=True) if soup.body else "" + if len(body) > 300: + return _strip_html(body)[:MAX_BODY_CHARS] + except Exception: + pass + return _strip_html(html)[:MAX_BODY_CHARS] + except Exception: + return None + + +def enrich_item_with_body(item: dict, max_chars: int = MAX_BODY_CHARS) -> None: + """ + 若 item 有 url 且无 full_text,则抓取正文并写入 item["full_text"]。 + 用于 AI 提取时获得更多上下文。原地修改 item。 + """ + if not FETCH_FULL_ARTICLE: + return + url = (item.get("url") or "").strip() + if not url or item.get("full_text"): + return + body = fetch_article_body(url) + if not body: + return + title = (item.get("title") or "").strip() + summary = (item.get("summary") or "").strip() + combined = f"{title}\n{summary}\n{body}" if summary else f"{title}\n{body}" + item["full_text"] = combined[:max_chars] diff --git a/crawler/config.py b/crawler/config.py index f5bc435..ee34a26 100644 --- a/crawler/config.py +++ b/crawler/config.py @@ -16,7 +16,11 @@ DASHSCOPE_API_KEY = os.environ.get("DASHSCOPE_API_KEY", "") # 抓取间隔(秒) CRAWL_INTERVAL = int(os.environ.get("CRAWL_INTERVAL", "300")) +# 单源抓取超时(秒),避免某源卡住拖垮整轮 +FEED_TIMEOUT = int(os.environ.get("FEED_TIMEOUT", "12")) + # RSS 源:世界主流媒体,覆盖美伊/中东多视角 +# 每项为 URL 字符串,或 {"name": "显示名", "url": "..."} 便于日志与排查 RSS_FEEDS = [ # 美国 "https://feeds.reuters.com/reuters/topNews", @@ -35,6 +39,9 @@ RSS_FEEDS = [ # 中国 "https://english.news.cn/rss/world.xml", "https://www.cgtn.com/rss/world", + # 凤凰网(军事 + 国际,中文视角) + {"name": "凤凰军事", "url": "https://feedx.net/rss/ifengmil.xml"}, + {"name": "凤凰国际", "url": "https://feedx.net/rss/ifengworld.xml"}, # 伊朗 "https://www.presstv.ir/rss", # 卡塔尔(中东) @@ -42,6 +49,22 @@ RSS_FEEDS = [ "https://www.aljazeera.com/xml/rss/middleeast.xml", ] + +def get_feed_sources(): + """返回 [(name, url), ...],name 用于日志,缺省为 URL 的 host""" + import urllib.parse + out = [] + for raw in RSS_FEEDS: + if isinstance(raw, dict): + name = raw.get("name") or "rss" + url = raw.get("url", "").strip() + else: + url = (raw or "").strip() + name = urllib.parse.urlparse(url).netloc or "rss" + if url: + out.append((name, url)) + return out + # 关键词过滤:至少匹配一个才会入库(与地图区域对应:伊拉克/叙利亚/海湾/红海/地中海等) KEYWORDS = [ # 伊朗 diff --git a/crawler/db_merge.py b/crawler/db_merge.py index a23326f..380d055 100644 --- a/crawler/db_merge.py +++ b/crawler/db_merge.py @@ -12,6 +12,23 @@ from typing import Any, Dict, Optional PROJECT_ROOT = Path(__file__).resolve().parent.parent DB_PATH = os.environ.get("DB_PATH", str(PROJECT_ROOT / "server" / "data.db")) +# 单次合并时各字段增量的上限,防止误把「累计总数」当增量导致数据剧增(可选,设为 0 表示不设限) +MAX_DELTA_PER_MERGE = { + "personnel_killed": 500, "personnel_wounded": 1000, "civilian_killed": 300, "civilian_wounded": 500, + "bases_destroyed": 5, "bases_damaged": 10, + "aircraft": 50, "warships": 10, "armor": 30, "vehicles": 100, + "drones": 50, "missiles": 200, "helicopters": 20, "submarines": 5, "carriers": 2, + "civilian_ships": 20, "airport_port": 10, +} + + +def _clamp_delta(key: str, value: int) -> int: + """单次增量上限,避免误提「累计」导致波动""" + cap = MAX_DELTA_PER_MERGE.get(key, 0) + if cap <= 0: + return max(0, value) + return max(0, min(value, cap)) + def _ensure_tables(conn: sqlite3.Connection) -> None: """确保所需表存在(与 db.js 一致)""" @@ -41,7 +58,7 @@ def _ensure_tables(conn: sqlite3.Connection) -> None: conn.execute("ALTER TABLE combat_losses ADD COLUMN updated_at TEXT DEFAULT (datetime('now'))") except sqlite3.OperationalError: pass - for col in ("drones", "missiles", "helicopters", "submarines", "tanks", "civilian_ships", "airport_port"): + for col in ("drones", "missiles", "helicopters", "submarines", "tanks", "carriers", "civilian_ships", "airport_port"): try: conn.execute(f"ALTER TABLE combat_losses ADD COLUMN {col} INTEGER NOT NULL DEFAULT 0") except sqlite3.OperationalError: @@ -72,19 +89,19 @@ def merge(extracted: Dict[str, Any], db_path: Optional[str] = None) -> bool: ) if conn.total_changes > 0: updated = True - # combat_losses:增量叠加到当前值,无行则先插入初始行 + # combat_losses:统一按增量处理。AI 输出为本则报道的新增数,此处叠加到库内当前值,避免把「累计总数」当增量导致数据波动。 if "combat_losses_delta" in extracted: for side, delta in extracted["combat_losses_delta"].items(): if side not in ("us", "iran"): continue try: row = conn.execute( - "SELECT personnel_killed,personnel_wounded,civilian_killed,civilian_wounded,bases_destroyed,bases_damaged,aircraft,warships,armor,vehicles,drones,missiles,helicopters,submarines,tanks,civilian_ships,airport_port FROM combat_losses WHERE side = ?", + "SELECT personnel_killed,personnel_wounded,civilian_killed,civilian_wounded,bases_destroyed,bases_damaged,aircraft,warships,armor,vehicles,drones,missiles,helicopters,submarines,tanks,carriers,civilian_ships,airport_port FROM combat_losses WHERE side = ?", (side,), ).fetchone() cur = {"personnel_killed": 0, "personnel_wounded": 0, "civilian_killed": 0, "civilian_wounded": 0, "bases_destroyed": 0, "bases_damaged": 0, "aircraft": 0, "warships": 0, "armor": 0, "vehicles": 0, - "drones": 0, "missiles": 0, "helicopters": 0, "submarines": 0, "tanks": 0, "civilian_ships": 0, "airport_port": 0} + "drones": 0, "missiles": 0, "helicopters": 0, "submarines": 0, "tanks": 0, "carriers": 0, "civilian_ships": 0, "airport_port": 0} if row: cur = { "personnel_killed": row[0], "personnel_wounded": row[1], "civilian_killed": row[2] or 0, @@ -92,38 +109,39 @@ def merge(extracted: Dict[str, Any], db_path: Optional[str] = None) -> bool: "aircraft": row[6], "warships": row[7], "armor": row[8], "vehicles": row[9], "drones": row[10] if len(row) > 10 else 0, "missiles": row[11] if len(row) > 11 else 0, "helicopters": row[12] if len(row) > 12 else 0, "submarines": row[13] if len(row) > 13 else 0, - "tanks": row[14] if len(row) > 14 else 0, "civilian_ships": row[15] if len(row) > 15 else 0, "airport_port": row[16] if len(row) > 16 else 0, + "tanks": row[14] if len(row) > 14 else 0, "carriers": row[15] if len(row) > 15 else (row[14] if len(row) > 14 else 0), + "civilian_ships": row[16] if len(row) > 16 else 0, "airport_port": row[17] if len(row) > 17 else 0, } - pk = max(0, (cur["personnel_killed"] or 0) + delta.get("personnel_killed", 0)) - pw = max(0, (cur["personnel_wounded"] or 0) + delta.get("personnel_wounded", 0)) - ck = max(0, (cur["civilian_killed"] or 0) + delta.get("civilian_killed", 0)) - cw = max(0, (cur["civilian_wounded"] or 0) + delta.get("civilian_wounded", 0)) - bd = max(0, (cur["bases_destroyed"] or 0) + delta.get("bases_destroyed", 0)) - bm = max(0, (cur["bases_damaged"] or 0) + delta.get("bases_damaged", 0)) - ac = max(0, (cur["aircraft"] or 0) + delta.get("aircraft", 0)) - ws = max(0, (cur["warships"] or 0) + delta.get("warships", 0)) - ar = max(0, (cur["armor"] or 0) + delta.get("armor", 0)) - vh = max(0, (cur["vehicles"] or 0) + delta.get("vehicles", 0)) - dr = max(0, (cur["drones"] or 0) + delta.get("drones", 0)) - ms = max(0, (cur["missiles"] or 0) + delta.get("missiles", 0)) - hp = max(0, (cur["helicopters"] or 0) + delta.get("helicopters", 0)) - sb = max(0, (cur["submarines"] or 0) + delta.get("submarines", 0)) - tk = max(0, (cur["tanks"] or 0) + delta.get("tanks", 0)) - cs = max(0, (cur["civilian_ships"] or 0) + delta.get("civilian_ships", 0)) - ap = max(0, (cur["airport_port"] or 0) + delta.get("airport_port", 0)) + pk = max(0, (cur["personnel_killed"] or 0) + _clamp_delta("personnel_killed", delta.get("personnel_killed", 0))) + pw = max(0, (cur["personnel_wounded"] or 0) + _clamp_delta("personnel_wounded", delta.get("personnel_wounded", 0))) + ck = max(0, (cur["civilian_killed"] or 0) + _clamp_delta("civilian_killed", delta.get("civilian_killed", 0))) + cw = max(0, (cur["civilian_wounded"] or 0) + _clamp_delta("civilian_wounded", delta.get("civilian_wounded", 0))) + bd = max(0, (cur["bases_destroyed"] or 0) + _clamp_delta("bases_destroyed", delta.get("bases_destroyed", 0))) + bm = max(0, (cur["bases_damaged"] or 0) + _clamp_delta("bases_damaged", delta.get("bases_damaged", 0))) + ac = max(0, (cur["aircraft"] or 0) + _clamp_delta("aircraft", delta.get("aircraft", 0))) + ws = max(0, (cur["warships"] or 0) + _clamp_delta("warships", delta.get("warships", 0))) + ar = max(0, (cur["armor"] or 0) + _clamp_delta("armor", delta.get("armor", 0))) + vh = max(0, (cur["vehicles"] or 0) + _clamp_delta("vehicles", delta.get("vehicles", 0))) + dr = max(0, (cur["drones"] or 0) + _clamp_delta("drones", delta.get("drones", 0))) + ms = max(0, (cur["missiles"] or 0) + _clamp_delta("missiles", delta.get("missiles", 0))) + hp = max(0, (cur["helicopters"] or 0) + _clamp_delta("helicopters", delta.get("helicopters", 0))) + sb = max(0, (cur["submarines"] or 0) + _clamp_delta("submarines", delta.get("submarines", 0))) + cr = max(0, (cur["carriers"] or 0) + _clamp_delta("carriers", delta.get("carriers", 0))) + cs = max(0, (cur["civilian_ships"] or 0) + _clamp_delta("civilian_ships", delta.get("civilian_ships", 0))) + ap = max(0, (cur["airport_port"] or 0) + _clamp_delta("airport_port", delta.get("airport_port", 0))) ts = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.000Z") if row: conn.execute( """UPDATE combat_losses SET personnel_killed=?, personnel_wounded=?, civilian_killed=?, civilian_wounded=?, bases_destroyed=?, bases_damaged=?, aircraft=?, warships=?, armor=?, vehicles=?, - drones=?, missiles=?, helicopters=?, submarines=?, tanks=?, civilian_ships=?, airport_port=?, updated_at=? WHERE side=?""", - (pk, pw, ck, cw, bd, bm, ac, ws, ar, vh, dr, ms, hp, sb, tk, cs, ap, ts, side), + drones=?, missiles=?, helicopters=?, submarines=?, tanks=?, carriers=?, civilian_ships=?, airport_port=?, updated_at=? WHERE side=?""", + (pk, pw, ck, cw, bd, bm, ac, ws, ar, vh, dr, ms, hp, sb, cur.get("tanks", 0), cr, cs, ap, ts, side), ) else: conn.execute( """INSERT OR REPLACE INTO combat_losses (side, personnel_killed, personnel_wounded, civilian_killed, civilian_wounded, - bases_destroyed, bases_damaged, aircraft, warships, armor, vehicles, drones, missiles, helicopters, submarines, tanks, civilian_ships, airport_port, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", - (side, pk, pw, ck, cw, bd, bm, ac, ws, ar, vh, dr, ms, hp, sb, tk, cs, ap, ts), + bases_destroyed, bases_damaged, aircraft, warships, armor, vehicles, drones, missiles, helicopters, submarines, tanks, carriers, civilian_ships, airport_port, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + (side, pk, pw, ck, cw, bd, bm, ac, ws, ar, vh, dr, ms, hp, sb, 0, cr, cs, ap, ts), ) if conn.total_changes > 0: updated = True @@ -140,7 +158,7 @@ def merge(extracted: Dict[str, Any], db_path: Optional[str] = None) -> bool: w = extracted["wall_street"] conn.execute("INSERT INTO wall_street_trend (time, value) VALUES (?, ?)", (w["time"], w["value"])) updated = True - # key_location:更新受袭基地 status/damage_level + # key_location:更新双方攻击地点(美军基地被打击 side=us,伊朗设施被打击 side=iran)的 status/damage_level if "key_location_updates" in extracted: try: for u in extracted["key_location_updates"]: diff --git a/crawler/extractor_ai.py b/crawler/extractor_ai.py index 05c43a5..8dc443b 100644 --- a/crawler/extractor_ai.py +++ b/crawler/extractor_ai.py @@ -15,31 +15,40 @@ CLEANER_AI_DISABLED = os.environ.get("CLEANER_AI_DISABLED", "0") == "1" OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "llama3.1") -def _call_ollama_extract(text: str, timeout: int = 10) -> Optional[Dict[str, Any]]: - """调用 Ollama 提取结构化数据。输出 JSON,仅包含新闻中可明确推断的字段""" +# 用于 AI 提取的原文最大长度(有正文时取更长以提取精确数据) +EXTRACT_TEXT_MAX_LEN = int(os.environ.get("EXTRACT_TEXT_MAX_LEN", "4000")) + + +def _call_ollama_extract(text: str, timeout: int = 15) -> Optional[Dict[str, Any]]: + """调用 Ollama 从新闻全文/摘要中提取精确结构化数据,仅填写报道中明确给出的数字与事实。""" if CLEANER_AI_DISABLED or not text or len(str(text).strip()) < 10: return None try: import requests - prompt = f"""从以下美伊/中东新闻中提取可推断的数值,输出 JSON,仅包含有明确依据的字段。无依据则省略该字段。 + raw = str(text).strip()[:EXTRACT_TEXT_MAX_LEN] + prompt = f"""从以下美伊/中东新闻**全文或摘要**中,提取**报道明确给出的数字与事实**,输出 JSON。规则: +1. 仅填写报道中**直接出现、可核对**的数据,不要推测或估算。 +2. 无明确依据的字段**必须省略**,不要填 0 或猜。 +3. **战损一律按增量**:只填本则报道中「本次/此次/今日/本轮」**新增**的伤亡或损毁数量。若报道只给「累计总数」「迄今共」「total so far」等,**不要填写**该字段(避免与库内已有累计值重复叠加)。 +4. **攻击地点**:提取双方遭袭的具体地点。美军/盟军基地被打击 → side=us;伊朗/亲伊设施被打击 → side=iran。name_keywords 用「中文名|英文名」便于匹配,可填多处。 -要求: -- summary: 1-2句中文事实,≤80字 +字段说明: +- summary: 1-2 句中文事实概括,≤80 字 - category: deployment|alert|intel|diplomatic|other - severity: low|medium|high|critical -- 战损(仅当新闻明确提及数字时填写,格式 us_XXX / iran_XXX): +- 战损(**仅填本则报道的新增增量**,如「此次 5 人丧生」「今日又损 2 架」): us_personnel_killed, iran_personnel_killed, us_personnel_wounded, iran_personnel_wounded, us_civilian_killed, iran_civilian_killed, us_civilian_wounded, iran_civilian_wounded, - us_bases_destroyed, iran_bases_destroyed, us_bases_damaged, iran_bases_damaged. - 重要:bases_* 仅指已确认损毁/受损的基地数量;"军事目标"/targets 等泛指不是基地,若报道只说"X个军事目标遭袭"而无具体基地名,不填写 bases_* + us_bases_destroyed, iran_bases_destroyed, us_bases_damaged, iran_bases_damaged, us_aircraft, iran_aircraft, us_warships, iran_warships, us_armor, iran_armor, us_vehicles, iran_vehicles, us_drones, iran_drones, us_missiles, iran_missiles, us_helicopters, iran_helicopters, us_submarines, iran_submarines, - us_tanks, iran_tanks, us_civilian_ships, iran_civilian_ships, us_airport_port, iran_airport_port -- retaliation_sentiment: 0-100,仅当新闻涉及伊朗报复情绪时 -- wall_street_value: 0-100,仅当新闻涉及美股/市场反应时 -- key_location_updates: 当新闻提及具体基地/地点遭袭时,数组项 { "name_keywords": "asad|阿萨德|assad", "side": "us", "status": "attacked", "damage_level": 1-3 } + us_carriers, iran_carriers, us_civilian_ships, iran_civilian_ships, us_airport_port, iran_airport_port +- retaliation_sentiment: 0-100,仅当报道涉及伊朗报复/反击情绪时 +- wall_street_value: 0-100,仅当报道涉及美股/市场时 +- key_location_updates: **双方攻击地点**。每项 {{ "name_keywords": "阿萨德|asad|al-asad", "side": "us或iran(被打击方)", "status": "attacked", "damage_level": 1-3 }}。美军基地例:阿萨德|asad、乌代德|udeid、埃尔比勒|erbil、因吉尔利克|incirlik。伊朗例:德黑兰|tehran、布什尔|bushehr、伊斯法罕|isfahan、阿巴斯|abbas、纳坦兹|natanz -原文:{str(text)[:800]} +原文: +{raw} 直接输出 JSON,不要解释:""" r = requests.post( @@ -48,7 +57,7 @@ def _call_ollama_extract(text: str, timeout: int = 10) -> Optional[Dict[str, Any "model": OLLAMA_MODEL, "messages": [{"role": "user", "content": prompt}], "stream": False, - "options": {"num_predict": 256}, + "options": {"num_predict": 384}, }, timeout=timeout, ) @@ -82,7 +91,7 @@ def extract_from_news(text: str, timestamp: Optional[str] = None) -> Dict[str, A # combat_losses 增量(仅数字字段) loss_us = {} loss_ir = {} - for k in ["personnel_killed", "personnel_wounded", "civilian_killed", "civilian_wounded", "bases_destroyed", "bases_damaged", "aircraft", "warships", "armor", "vehicles", "drones", "missiles", "helicopters", "submarines", "tanks", "civilian_ships", "airport_port"]: + for k in ["personnel_killed", "personnel_wounded", "civilian_killed", "civilian_wounded", "bases_destroyed", "bases_damaged", "aircraft", "warships", "armor", "vehicles", "drones", "missiles", "helicopters", "submarines", "carriers", "civilian_ships", "airport_port"]: uk = f"us_{k}" ik = f"iran_{k}" if uk in parsed and isinstance(parsed[uk], (int, float)): diff --git a/crawler/extractor_dashscope.py b/crawler/extractor_dashscope.py index 439e9b9..9001ba8 100644 --- a/crawler/extractor_dashscope.py +++ b/crawler/extractor_dashscope.py @@ -13,8 +13,11 @@ from typing import Any, Dict, Optional from panel_schema import validate_category, validate_severity, validate_summary +EXTRACT_TEXT_MAX_LEN = int(os.environ.get("EXTRACT_TEXT_MAX_LEN", "4000")) + + def _call_dashscope_extract(text: str, timeout: int = 15) -> Optional[Dict[str, Any]]: - """调用阿里云 DashScope 提取结构化数据""" + """调用阿里云 DashScope 从新闻全文中提取精确结构化数据,仅填写报道明确给出的数字与事实。""" api_key = os.environ.get("DASHSCOPE_API_KEY", "").strip() if not api_key or not text or len(str(text).strip()) < 10: return None @@ -23,27 +26,25 @@ def _call_dashscope_extract(text: str, timeout: int = 15) -> Optional[Dict[str, from http import HTTPStatus dashscope.api_key = api_key + raw = str(text).strip()[:EXTRACT_TEXT_MAX_LEN] - prompt = f"""从以下美伊/中东军事新闻中提取可明确推断的数值,输出 JSON。无依据的字段省略不写。 + prompt = f"""从以下美伊/中东新闻**全文或摘要**中,提取**报道明确给出的数字与事实**,输出 JSON。规则: +1. 仅填写报道中**直接出现、可核对**的数据,不要推测或估算。 +2. 无明确依据的字段**必须省略**,不要填 0 或猜。 +3. **战损一律按增量**:只填本则报道中「本次/此次/今日」**新增**数量。报道若只给「累计总数」「迄今共」**不要填**该字段。 +4. **攻击地点**:提取双方遭袭地点。美军/盟军基地被打击 → side=us;伊朗/亲伊设施被打击 → side=iran。name_keywords 用「中文|英文」,可填多处。 -要求: -- summary: 1-2句中文事实摘要,≤80字 +字段: +- summary: 1-2 句中文事实概括,≤80 字 - category: deployment|alert|intel|diplomatic|other - severity: low|medium|high|critical -- 战损(仅当新闻明确提及数字时填写): - us_personnel_killed, iran_personnel_killed, us_personnel_wounded, iran_personnel_wounded, - us_civilian_killed, iran_civilian_killed, us_civilian_wounded, iran_civilian_wounded, - us_bases_destroyed, iran_bases_destroyed, us_bases_damaged, iran_bases_damaged. - 重要:bases_* 仅指已确认损毁/受损的基地数量;"军事目标"/"targets"等泛指不是基地,若报道只说"X个军事目标遭袭"而无具体基地名,不填写 bases_* - us_aircraft, iran_aircraft, us_warships, iran_warships, us_armor, iran_armor, us_vehicles, iran_vehicles, - us_drones, iran_drones, us_missiles, iran_missiles, us_helicopters, iran_helicopters, us_submarines, iran_submarines, - us_tanks, iran_tanks, us_civilian_ships, iran_civilian_ships, us_airport_port, iran_airport_port -- retaliation_sentiment: 0-100,仅当新闻涉及伊朗报复/反击情绪时 -- wall_street_value: 0-100,仅当新闻涉及美股/市场反应时 -- key_location_updates: 当新闻提及具体基地/设施遭袭时必填,数组 [{{"name_keywords":"阿萨德|asad|assad|阿因","side":"us","status":"attacked","damage_level":1-3}}]。常用关键词:阿萨德|asad|巴格达|baghdad|乌代德|udeid|埃尔比勒|erbil|因吉尔利克|incirlik|德黑兰|tehran|阿巴斯|abbas|布什尔|bushehr|伊斯法罕|isfahan|纳坦兹|natanz +- 战损(**仅填本则报道的新增增量**): us_personnel_killed, iran_personnel_killed, us_personnel_wounded, iran_personnel_wounded, us_civilian_killed, iran_civilian_killed, us_civilian_wounded, iran_civilian_wounded, us_bases_destroyed, iran_bases_destroyed, us_bases_damaged, iran_bases_damaged, us_aircraft, iran_aircraft, us_warships, iran_warships, us_armor, iran_armor, us_vehicles, iran_vehicles, us_drones, iran_drones, us_missiles, iran_missiles, us_helicopters, iran_helicopters, us_submarines, iran_submarines, us_carriers, iran_carriers, us_civilian_ships, iran_civilian_ships, us_airport_port, iran_airport_port +- retaliation_sentiment: 0-100(仅当报道涉及伊朗报复情绪时) +- wall_street_value: 0-100(仅当报道涉及美股/市场时) +- key_location_updates: **双方攻击地点**。每项 {{"name_keywords":"阿萨德|asad","side":"us或iran(被打击方)","status":"attacked","damage_level":1-3}}。美军基地:阿萨德|asad、乌代德|udeid、埃尔比勒|erbil、因吉尔利克|incirlik。伊朗:德黑兰|tehran、布什尔|bushehr、伊斯法罕|isfahan、阿巴斯|abbas、纳坦兹|natanz 原文: -{str(text)[:800]} +{raw} 直接输出 JSON,不要其他解释:""" @@ -86,7 +87,7 @@ def extract_from_news(text: str, timestamp: Optional[str] = None) -> Dict[str, A loss_ir = {} for k in ["personnel_killed", "personnel_wounded", "civilian_killed", "civilian_wounded", "bases_destroyed", "bases_damaged", "aircraft", "warships", "armor", "vehicles", - "drones", "missiles", "helicopters", "submarines", "tanks", "civilian_ships", "airport_port"]: + "drones", "missiles", "helicopters", "submarines", "carriers", "civilian_ships", "airport_port"]: uk, ik = f"us_{k}", f"iran_{k}" if uk in parsed and isinstance(parsed[uk], (int, float)): loss_us[k] = max(0, int(parsed[uk])) diff --git a/crawler/extractor_rules.py b/crawler/extractor_rules.py index 0f04164..4414897 100644 --- a/crawler/extractor_rules.py +++ b/crawler/extractor_rules.py @@ -172,13 +172,13 @@ def extract_from_news(text: str, timestamp: Optional[str] = None) -> Dict[str, A else: loss_us["submarines"] = v - # 坦克 tank / 坦克 - v = _first_int(t, r"(\d+)[\s\w]*(?:tank|坦克)[\s\w]*(?:destroyed|damaged|lost|hit|摧毁|损毁|击毁)") + # 航母 carrier / 航空母舰 / 航母 + v = _first_int(t, r"(\d+)[\s\w]*(?:carrier|aircraft\s*carrier|航母|航空母舰)[\s\w]*(?:destroyed|damaged|lost|hit|sunk|摧毁|损毁|击毁|沉没)") if v is not None: if "iran" in t or "iranian" in t: - loss_ir["tanks"] = v + loss_ir["carriers"] = v else: - loss_us["tanks"] = v + loss_us["carriers"] = v # 民船 civilian ship / 商船 / 民船 v = _first_int(t, r"(\d+)[\s\w]*(?:civilian\s*ship|merchant|商船|民船)[\s\w]*(?:sunk|damaged|hit|击沉|受损)") diff --git a/crawler/main.py b/crawler/main.py index 3dfc6ab..0414aa4 100644 --- a/crawler/main.py +++ b/crawler/main.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -"""爬虫入口:定时抓取 → 解析 → 入库 → 通知 API""" +"""爬虫入口:定时执行完整写库流水线(抓取 → 清洗 → 去重 → 映射 → 更新表 → 通知 API)""" import time import sys from pathlib import Path @@ -8,34 +8,18 @@ from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parent)) from config import DB_PATH, API_BASE, CRAWL_INTERVAL -from scrapers.rss_scraper import fetch_all -from db_writer import write_updates - - -def notify_api() -> bool: - """调用 Node API 触发立即广播""" - try: - import urllib.request - req = urllib.request.Request( - f"{API_BASE}/api/crawler/notify", - method="POST", - headers={"Content-Type": "application/json"}, - ) - with urllib.request.urlopen(req, timeout=5) as resp: - return resp.status == 200 - except Exception as e: - print(f" [warn] notify API failed: {e}") - return False +from pipeline import run_full_pipeline def run_once() -> int: - items = fetch_all() - if not items: - return 0 - n = write_updates(items) - if n > 0: - notify_api() - return n + """执行一轮:抓取、清洗、去重、映射、写表、通知。返回本轮新增条数(面板或资讯)。""" + n_fetched, n_news, n_panel = run_full_pipeline( + db_path=DB_PATH, + api_base=API_BASE, + translate=True, + notify=True, + ) + return n_panel or n_news def main() -> None: @@ -45,7 +29,7 @@ def main() -> None: try: n = run_once() if n > 0: - print(f"[{time.strftime('%H:%M:%S')}] Inserted {n} new update(s)") + print(f"[{time.strftime('%H:%M:%S')}] 抓取完成,去重后新增 {n} 条,已写库并通知 API") except KeyboardInterrupt: break except Exception as e: diff --git a/crawler/panel_schema.py b/crawler/panel_schema.py index d003beb..e9a1ac5 100644 --- a/crawler/panel_schema.py +++ b/crawler/panel_schema.py @@ -19,7 +19,7 @@ TimeSeriesPoint = Tuple[str, int] # (ISO time, value) # AI 可从新闻中提取的字段 EXTRACTABLE_FIELDS = { "situation_update": ["summary", "category", "severity", "timestamp"], - "combat_losses": ["personnel_killed", "personnel_wounded", "civilian_killed", "civilian_wounded", "bases_destroyed", "bases_damaged", "aircraft", "warships", "armor", "vehicles", "drones", "missiles", "helicopters", "submarines", "tanks", "civilian_ships", "airport_port"], + "combat_losses": ["personnel_killed", "personnel_wounded", "civilian_killed", "civilian_wounded", "bases_destroyed", "bases_damaged", "aircraft", "warships", "armor", "vehicles", "drones", "missiles", "helicopters", "submarines", "tanks", "carriers", "civilian_ships", "airport_port"], "retaliation": ["value"], # 0-100 "wall_street_trend": ["time", "value"], # 0-100 "conflict_stats": ["estimated_casualties", "estimated_strike_count"], diff --git a/crawler/pipeline.py b/crawler/pipeline.py new file mode 100644 index 0000000..df8494e --- /dev/null +++ b/crawler/pipeline.py @@ -0,0 +1,150 @@ +# -*- coding: utf-8 -*- +""" +统一写库流水线:抓取 → 清洗 → 去重 → 映射到前端库字段 → 更新表 → 通知 +与 server/README.md 第五节「爬虫侧写库链路」一致,供 main.py 与 realtime_conflict_service 共用。 +""" +import os +from datetime import datetime, timezone +from typing import Callable, Optional, Tuple + +from config import DB_PATH, API_BASE + + +def _notify_api(api_base: str) -> bool: + """调用 Node API 触发立即广播""" + try: + import urllib.request + req = urllib.request.Request( + f"{api_base.rstrip('/')}/api/crawler/notify", + method="POST", + headers={"Content-Type": "application/json"}, + ) + with urllib.request.urlopen(req, timeout=5) as resp: + return resp.status == 200 + except Exception as e: + print(f" [warn] notify API failed: {e}") + return False + + +def _extract_and_merge(items: list, db_path: str) -> bool: + """AI 从新闻全文或标题+摘要中提取精确结构化数据,合并到 combat_losses / key_location 等表。""" + if not items or not os.path.exists(db_path): + return False + try: + from db_merge import merge + use_dashscope = bool(os.environ.get("DASHSCOPE_API_KEY", "").strip()) + if use_dashscope: + from extractor_dashscope import extract_from_news + limit = 10 + elif os.environ.get("CLEANER_AI_DISABLED", "0") == "1": + from extractor_rules import extract_from_news + limit = 25 + else: + from extractor_ai import extract_from_news + limit = 10 + merged_any = False + for it in items[:limit]: + # 优先用正文(article_fetcher 抓取),否则用标题+摘要,供 AI 提取精确数字 + text = it.get("full_text") or ((it.get("title", "") or "") + " " + (it.get("summary", "") or "")) + if len(text.strip()) < 20: + continue + pub = it.get("published") + ts = None + if pub: + try: + if isinstance(pub, str): + pub_dt = datetime.fromisoformat(pub.replace("Z", "+00:00")) + else: + pub_dt = pub + if pub_dt.tzinfo: + pub_dt = pub_dt.astimezone(timezone.utc) + ts = pub_dt.strftime("%Y-%m-%dT%H:%M:%S.000Z") + except Exception: + pass + extracted = extract_from_news(text, timestamp=ts) + if extracted and merge(extracted, db_path=db_path): + merged_any = True + return merged_any + except Exception as e: + print(f" [warn] AI 面板数据提取/合并: {e}") + return False + + +def run_full_pipeline( + db_path: Optional[str] = None, + api_base: Optional[str] = None, + *, + translate: bool = True, + notify: bool = True, + on_notify: Optional[Callable[[], None]] = None, +) -> Tuple[int, int, int]: + """ + 执行完整写库链路: + 1. 爬虫抓取实时数据 + 2. AI 清洗(标题/摘要/分类)→ 有效数据 + 3. 去重(news_content content_hash)→ 仅新项进入后续 + 4. 有效数据映射到前端库字段(situation_update、news_content、combat_losses 等) + 5. 更新数据库表;若有更新则通知后端 + + translate: 是否对标题/摘要做翻译(英→中) + notify: 是否在流水线末尾调用 POST /api/crawler/notify + on_notify: 若提供,在通知前调用(供 gdelt 服务做 GDELT 回填等) + + 返回: (本轮抓取条数, 去重后新增资讯数, 写入 situation_update 条数) + """ + path = db_path or DB_PATH + base = api_base or API_BASE + + from scrapers.rss_scraper import fetch_all + from db_writer import write_updates + from news_storage import save_and_dedup + from cleaner_ai import clean_news_for_panel, ensure_category, ensure_severity + + # 1. 抓取 + items = fetch_all() + if not items: + return 0, 0, 0 + + # 2. 清洗(标题/摘要/分类,符合面板 schema) + if translate: + from translate_utils import translate_to_chinese + for it in items: + raw_title = translate_to_chinese(it.get("title", "") or "") + raw_summary = translate_to_chinese(it.get("summary", "") or it.get("title", "")) + it["title"] = clean_news_for_panel(raw_title, max_len=80) + it["summary"] = clean_news_for_panel(raw_summary or raw_title, max_len=120) + else: + for it in items: + it["title"] = clean_news_for_panel(it.get("title", "") or "", max_len=80) + it["summary"] = clean_news_for_panel(it.get("summary", "") or it.get("title", ""), max_len=120) + for it in items: + it["category"] = ensure_category(it.get("category", "other")) + it["severity"] = ensure_severity(it.get("severity", "medium")) + it["source"] = it.get("source") or "rss" + + # 3. 去重:落库 news_content,仅新项返回 + new_items, n_news = save_and_dedup(items, db_path=path) + + # 3.5 数据增强:为参与 AI 提取的条目抓取正文,便于从全文提取精确数据(伤亡、基地等) + if new_items: + try: + from article_fetcher import enrich_item_with_body + # 仅对前若干条抓取正文,避免单轮请求过多 + enrich_limit = int(os.environ.get("ARTICLE_FETCH_LIMIT", "10")) + for it in new_items[:enrich_limit]: + enrich_item_with_body(it) + except Exception as e: + print(f" [warn] 正文抓取: {e}") + + # 4. 映射到前端库字段并更新表 + n_panel = write_updates(new_items) if new_items else 0 + if new_items: + _extract_and_merge(new_items, path) + + # 5. 通知(有新增时才通知;可选:先执行外部逻辑如 GDELT 回填,再通知) + if on_notify: + on_notify() + if notify and (n_panel > 0 or n_news > 0): + _notify_api(base) + + return len(items), n_news, n_panel diff --git a/crawler/realtime_conflict_service.py b/crawler/realtime_conflict_service.py index 5e4e626..ea46f00 100644 --- a/crawler/realtime_conflict_service.py +++ b/crawler/realtime_conflict_service.py @@ -283,93 +283,34 @@ def _rss_to_gdelt_fallback() -> None: # ========================== -# RSS 新闻抓取:资讯落库(去重) → AI 提取 → 面板数据落库 → 通知前端 +# RSS 新闻抓取:使用统一流水线(抓取 → 清洗 → 去重 → 映射 → 写表 → 通知) # ========================== LAST_FETCH = {"items": 0, "inserted": 0, "error": None} def fetch_news() -> None: + """执行完整写库流水线;GDELT 禁用时用 RSS 回填 gdelt_events,再通知 Node。""" try: - from scrapers.rss_scraper import fetch_all - from db_writer import write_updates - from news_storage import save_and_dedup - from translate_utils import translate_to_chinese - from cleaner_ai import clean_news_for_panel - from cleaner_ai import ensure_category, ensure_severity + from pipeline import run_full_pipeline LAST_FETCH["error"] = None - items = fetch_all() - for it in items: - raw_title = translate_to_chinese(it.get("title", "") or "") - raw_summary = translate_to_chinese(it.get("summary", "") or it.get("title", "")) - it["title"] = clean_news_for_panel(raw_title, max_len=80) - it["summary"] = clean_news_for_panel(raw_summary or raw_title, max_len=120) - it["category"] = ensure_category(it.get("category", "other")) - it["severity"] = ensure_severity(it.get("severity", "medium")) - it["source"] = it.get("source") or "rss" - # 1. 历史去重:资讯内容落库 news_content(独立表,便于后续消费) - new_items, n_news = save_and_dedup(items, db_path=DB_PATH) - # 2. 面板展示:新增资讯写入 situation_update(供前端 recentUpdates) - n_panel = write_updates(new_items) if new_items else 0 - LAST_FETCH["items"] = len(items) + n_fetched, n_news, n_panel = run_full_pipeline( + db_path=DB_PATH, + api_base=API_BASE, + translate=True, + notify=False, + ) + LAST_FETCH["items"] = n_fetched LAST_FETCH["inserted"] = n_news - # 3. AI 提取 + 合并到 combat_losses / key_location 等 - if new_items: - _extract_and_merge_panel_data(new_items) - # GDELT 禁用时用 RSS 填充 gdelt_events,使地图有冲突点 if GDELT_DISABLED: _rss_to_gdelt_fallback() _notify_node() - print(f"[{datetime.now().strftime('%H:%M:%S')}] RSS 抓取 {len(items)} 条,去重后新增 {n_news} 条资讯,面板 {n_panel} 条") + if n_fetched > 0: + print(f"[{datetime.now().strftime('%H:%M:%S')}] RSS 抓取 {n_fetched} 条,去重后新增 {n_news} 条资讯,面板 {n_panel} 条") except Exception as e: LAST_FETCH["error"] = str(e) print(f"[{datetime.now().strftime('%H:%M:%S')}] 新闻抓取失败: {e}") -def _extract_and_merge_panel_data(items: list) -> None: - """AI 分析提取面板相关数据,清洗后落库""" - if not items or not os.path.exists(DB_PATH): - return - try: - from db_merge import merge - use_dashscope = bool(os.environ.get("DASHSCOPE_API_KEY", "").strip()) - if use_dashscope: - from extractor_dashscope import extract_from_news - limit = 10 - elif os.environ.get("CLEANER_AI_DISABLED", "0") == "1": - from extractor_rules import extract_from_news - limit = 25 - else: - from extractor_ai import extract_from_news - limit = 10 - from datetime import timezone - merged_any = False - for it in items[:limit]: - text = (it.get("title", "") or "") + " " + (it.get("summary", "") or "") - if len(text.strip()) < 20: - continue - pub = it.get("published") - ts = None - if pub: - try: - if isinstance(pub, str): - pub_dt = datetime.fromisoformat(pub.replace("Z", "+00:00")) - else: - pub_dt = pub - if pub_dt.tzinfo: - pub_dt = pub_dt.astimezone(timezone.utc) - ts = pub_dt.strftime("%Y-%m-%dT%H:%M:%S.000Z") - except Exception: - pass - extracted = extract_from_news(text, timestamp=ts) - if extracted: - if merge(extracted, db_path=DB_PATH): - merged_any = True - if merged_any: - _notify_node() - except Exception as e: - print(f" [warn] AI 面板数据提取/合并: {e}") - - # ========================== # 定时任务(asyncio 后台任务,避免 APScheduler executor 关闭竞态) # ========================== diff --git a/crawler/requirements.txt b/crawler/requirements.txt index 427768e..bc93781 100644 --- a/crawler/requirements.txt +++ b/crawler/requirements.txt @@ -1,5 +1,6 @@ requests>=2.31.0 feedparser>=6.0.0 +beautifulsoup4>=4.12.0 pytest>=7.0.0 fastapi>=0.109.0 uvicorn>=0.27.0 diff --git a/crawler/scrapers/__pycache__/rss_scraper.cpython-311.pyc b/crawler/scrapers/__pycache__/rss_scraper.cpython-311.pyc index 5882ceb..50257ba 100644 Binary files a/crawler/scrapers/__pycache__/rss_scraper.cpython-311.pyc and b/crawler/scrapers/__pycache__/rss_scraper.cpython-311.pyc differ diff --git a/crawler/scrapers/__pycache__/rss_scraper.cpython-39.pyc b/crawler/scrapers/__pycache__/rss_scraper.cpython-39.pyc index b0f243d..669cca2 100644 Binary files a/crawler/scrapers/__pycache__/rss_scraper.cpython-39.pyc and b/crawler/scrapers/__pycache__/rss_scraper.cpython-39.pyc differ diff --git a/crawler/scrapers/rss_scraper.py b/crawler/scrapers/rss_scraper.py index 9af4da7..b00b30d 100644 --- a/crawler/scrapers/rss_scraper.py +++ b/crawler/scrapers/rss_scraper.py @@ -1,11 +1,12 @@ # -*- coding: utf-8 -*- -"""RSS 抓取""" +"""RSS 抓取:按源独立超时与错误隔离,单源失败不影响其他源""" import re +import socket from datetime import datetime, timezone import feedparser -from config import RSS_FEEDS, KEYWORDS +from config import KEYWORDS, FEED_TIMEOUT, get_feed_sources from parser_ai import classify_and_severity @@ -32,45 +33,62 @@ def _matches_keywords(text: str) -> bool: return False -def fetch_all() -> list[dict]: - import socket - items: list[dict] = [] - seen: set[str] = set() - # 单源超时 10 秒,避免某源卡住 +def _fetch_one_feed(name: str, url: str, timeout: int) -> list[dict]: + """抓取单个 RSS 源,超时或异常返回空列表。不负责去重。""" old_timeout = socket.getdefaulttimeout() - socket.setdefaulttimeout(10) + socket.setdefaulttimeout(timeout) try: - for url in RSS_FEEDS: - try: - feed = feedparser.parse( - url, - request_headers={"User-Agent": "US-Iran-Dashboard/1.0"}, - agent="US-Iran-Dashboard/1.0", - ) - except Exception: - continue - for entry in feed.entries: - title = getattr(entry, "title", "") or "" - raw_summary = getattr(entry, "summary", "") or getattr(entry, "description", "") or "" - summary = _strip_html(raw_summary) - link = getattr(entry, "link", "") or "" - text = f"{title} {summary}" - if not _matches_keywords(text): - continue - key = (title[:80], link) - if key in seen: - continue - seen.add(key) - published = _parse_date(entry) - cat, sev = classify_and_severity(text) - items.append({ - "title": title, - "summary": summary[:400] if summary else title, - "url": link, - "published": _parse_date(entry), - "category": cat, - "severity": sev, - }) + feed = feedparser.parse( + url, + request_headers={"User-Agent": "US-Iran-Dashboard/1.0"}, + agent="US-Iran-Dashboard/1.0", + ) + except Exception as e: + print(f" [rss] {name} error: {e}") + return [] finally: socket.setdefaulttimeout(old_timeout) + + out = [] + for entry in feed.entries: + title = getattr(entry, "title", "") or "" + raw_summary = getattr(entry, "summary", "") or getattr(entry, "description", "") or "" + summary = _strip_html(raw_summary) + link = getattr(entry, "link", "") or "" + text = f"{title} {summary}" + if not _matches_keywords(text): + continue + published = _parse_date(entry) + cat, sev = classify_and_severity(text) + out.append({ + "title": title, + "summary": summary[:400] if summary else title, + "url": link, + "published": published, + "category": cat, + "severity": sev, + "source": name, + }) + return out + + +def fetch_all() -> list[dict]: + """抓取所有配置的 RSS 源,按源超时与隔离错误,全局去重后返回。""" + sources = get_feed_sources() + if not sources: + return [] + + items: list[dict] = [] + seen: set[tuple[str, str]] = set() + + for name, url in sources: + batch = _fetch_one_feed(name, url, FEED_TIMEOUT) + for item in batch: + key = (item["title"][:80], item["url"]) + if key in seen: + continue + seen.add(key) + # 写入 DB 的 schema 不包含 source,可后续扩展 + items.append({k: v for k, v in item.items() if k != "source"}) + return items