Files
usa/crawler/extractor_rules.py
2026-03-02 15:35:40 +08:00

124 lines
5.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
基于规则的新闻数据提取(无需 Ollama
从新闻文本中提取战损、报复情绪等数值,供 db_merge 写入
"""
import re
from datetime import datetime, timezone
from typing import Any, Dict, Optional
def _first_int(text: str, pattern: str) -> Optional[int]:
m = re.search(pattern, text, re.I)
if m and m.group(1) and m.group(1).replace(",", "").isdigit():
return max(0, int(m.group(1).replace(",", "")))
return None
def extract_from_news(text: str, timestamp: Optional[str] = None) -> Dict[str, Any]:
"""
规则提取:匹配数字+关键词,输出符合 panel schema 的字段(无需 Ollama
"""
ts = timestamp or datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.000Z")
out: Dict[str, Any] = {}
t = (text or "").lower()
loss_us, loss_ir = {}, {}
# 美军人员伤亡
v = _first_int(t, r"(?:us|american|u\.?s\.?)[\s\w]*(?:say|report)[\s\w]*(\d+)[\s\w]*(?:troop|soldier|military)[\s\w]*(?:killed|dead)")
if v is not None:
loss_us["personnel_killed"] = v
v = _first_int(t, r"(\d+)[\s\w]*(?:us|american)[\s\w]*(?:troop|soldier|military)[\s\w]*(?:killed|dead)")
if v is not None:
loss_us["personnel_killed"] = v
v = _first_int(t, r"(?:us|american)[\s\w]*(\d+)[\s\w]*(?:wounded|injured)")
if v is not None:
loss_us["personnel_wounded"] = v
# 伊朗人员伤亡
v = _first_int(t, r"(?:iran|iranian)[\s\w]*(?:say|report)[\s\w]*(\d+)[\s\w]*(?:troop|soldier|guard|killed|dead)")
if v is not None:
loss_ir["personnel_killed"] = v
v = _first_int(t, r"(\d+)[\s\w]*(?:iranian|iran)[\s\w]*(?:troop|soldier|guard|killed|dead)")
if v is not None:
loss_ir["personnel_killed"] = v
v = _first_int(t, r"(?:iran|iranian)[\s\w]*(\d+)[\s\w]*(?:wounded|injured)")
if v is not None:
loss_ir["personnel_wounded"] = v
# 平民伤亡(多不区分阵营,计入双方或仅 us 因多为美国基地周边)
v = _first_int(t, r"(\d+)[\s\w]*(?:civilian|civil)[\s\w]*(?:killed|dead)")
if v is not None:
loss_us["civilian_killed"] = v
v = _first_int(t, r"(\d+)[\s\w]*(?:civilian|civil)[\s\w]*(?:wounded|injured)")
if v is not None:
loss_us["civilian_wounded"] = v
# 基地损毁(美方基地居多)+ 中文
v = _first_int(t, r"(\d+)[\s\w]*(?:base|基地)[\s\w]*(?:destroyed|leveled|摧毁|夷平)")
if v is not None:
loss_us["bases_destroyed"] = v
v = _first_int(t, r"(\d+)[\s\w]*(?:base|基地)[\s\w]*(?:damaged|hit|struck|受损|袭击)")
if v is not None:
loss_us["bases_damaged"] = v
if ("base" in t or "基地" in t) and ("destroy" in t or "level" in t or "摧毁" in t or "夷平" in t) and not loss_us.get("bases_destroyed"):
loss_us["bases_destroyed"] = 1
if ("base" in t or "基地" in t) and ("damage" in t or "hit" in t or "struck" in t or "strike" in t or "袭击" in t or "受损" in t) and not loss_us.get("bases_damaged"):
loss_us["bases_damaged"] = 1
# 战机 / 舰船(根据上下文判断阵营)
v = _first_int(t, r"(\d+)[\s\w]*(?:aircraft|plane|jet|fighter|f-?16|f-?35|f-?18)[\s\w]*(?:down|destroyed|lost|shot)")
if v is not None:
if "us" in t or "american" in t or "u.s" in t:
loss_us["aircraft"] = v
elif "iran" in t:
loss_ir["aircraft"] = v
else:
loss_us["aircraft"] = v
v = _first_int(t, r"(\d+)[\s\w]*(?:ship|destroyer|warship|vessel)[\s\w]*(?:hit|damaged|sunk)")
if v is not None:
if "iran" in t:
loss_ir["warships"] = v
else:
loss_us["warships"] = v
if loss_us:
out.setdefault("combat_losses_delta", {})["us"] = loss_us
if loss_ir:
out.setdefault("combat_losses_delta", {})["iran"] = loss_ir
if "retaliat" in t or "revenge" in t or "报复" in t:
out["retaliation"] = {"value": 75, "time": ts}
if "wall street" in t or " dow " in t or "s&p" in t or "market slump" in t or "stock fall" in t or "美股" in t:
out["wall_street"] = {"time": ts, "value": 55}
# key_location_updates受袭基地与 key_location.name 匹配)
# 新闻提及基地遭袭时,更新对应基地 status
base_attacked = ("base" in t or "基地" in t) and ("attack" in t or "hit" in t or "strike" in t or "damage" in t or "袭击" in t or "打击" in t)
if base_attacked:
updates: list = []
# 常见美军基地关键词 -> name_keywords用于 db_merge 的 LIKE 匹配)
bases_us = [
("阿萨德|阿因|asad|assad|ain", "us"),
("巴格达|baghdad", "us"),
("乌代德|udeid|卡塔尔|qatar", "us"),
("阿克罗蒂里|akrotiri|塞浦路斯|cyprus", "us"),
("巴格拉姆|bagram|阿富汗|afghanistan", "us"),
("埃尔比勒|erbil", "us"),
("因吉尔利克|incirlik|土耳其|turkey", "us"),
("苏尔坦|sultan|沙特|saudi", "us"),
("坦夫|tanf|叙利亚|syria", "us"),
("达夫拉|dhafra|阿联酋|uae", "us"),
("内瓦提姆|nevatim|拉蒙|ramon|以色列|israel", "us"),
("赛利耶|sayliyah", "us"),
("巴林|bahrain", "us"),
("科威特|kuwait", "us"),
]
for kws, side in bases_us:
if any(k in t for k in kws.split("|")):
updates.append({"name_keywords": kws, "side": side, "status": "attacked", "damage_level": 2})
if updates:
out["key_location_updates"] = updates
return out