Files
usa/crawler/extractor_rules.py
2026-03-02 16:29:11 +08:00

166 lines
7.3 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
基于规则的新闻数据提取(无需 Ollama
从新闻文本中提取战损、报复情绪等数值,供 db_merge 写入
"""
import re
from datetime import datetime, timezone
from typing import Any, Dict, Optional
def _first_int(text: str, pattern: str) -> Optional[int]:
m = re.search(pattern, text, re.I)
if m and m.group(1) and m.group(1).replace(",", "").isdigit():
return max(0, int(m.group(1).replace(",", "")))
return None
def extract_from_news(text: str, timestamp: Optional[str] = None) -> Dict[str, Any]:
"""
规则提取:匹配数字+关键词,输出符合 panel schema 的字段(无需 Ollama
"""
ts = timestamp or datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.000Z")
out: Dict[str, Any] = {}
t = (text or "").lower()
loss_us, loss_ir = {}, {}
# 美军人员伤亡(中文,优先匹配)
v = _first_int(t, r"造成\s*(\d+)\s*名?\s*美军\s*伤亡")
if v is not None:
loss_us["personnel_killed"] = v
v = _first_int(t, r"(\d+)\s*名?\s*美军\s*伤亡") if loss_us.get("personnel_killed") is None else None
if v is not None:
loss_us["personnel_killed"] = v
v = _first_int(t, r"(\d+)\s*名?\s*(?:美军|美国军队|美国)\s*(?:死亡|阵亡)")
if v is not None:
loss_us["personnel_killed"] = v
v = _first_int(t, r"(\d+)\s*名?\s*(?:美军|美国)\s*受伤")
if v is not None:
loss_us["personnel_wounded"] = v
v = _first_int(t, r"美军\s*伤亡\s*(\d+)")
if v is not None and loss_us.get("personnel_killed") is None:
loss_us["personnel_killed"] = v
# 美军人员伤亡(英文)
v = _first_int(t, r"(?:us|american|u\.?s\.?)[\s\w]*(?:say|report)[\s\w]*(\d+)[\s\w]*(?:troop|soldier|military)[\s\w]*(?:killed|dead)")
if v is not None:
loss_us["personnel_killed"] = v
v = _first_int(t, r"(\d+)[\s\w]*(?:us|american)[\s\w]*(?:troop|soldier|military)[\s\w]*(?:killed|dead)")
if v is not None:
loss_us["personnel_killed"] = v
v = _first_int(t, r"(?:us|american)[\s\w]*(\d+)[\s\w]*(?:wounded|injured)")
if v is not None:
loss_us["personnel_wounded"] = v
# 伊朗人员伤亡(中文)
v = _first_int(t, r"(\d+)\s*名?\s*伊朗\s*伤亡")
if v is not None:
loss_ir["personnel_killed"] = v
v = _first_int(t, r"(\d+)\s*名?\s*(?:伊朗|伊朗军队)\s*(?:死亡|阵亡)")
if v is not None:
loss_ir["personnel_killed"] = v
v = _first_int(t, r"(\d+)\s*名?\s*伊朗\s*受伤")
if v is not None:
loss_ir["personnel_wounded"] = v
# 伊朗人员伤亡(英文)
v = _first_int(t, r"(?:iran|iranian)[\s\w]*(?:say|report)[\s\w]*(\d+)[\s\w]*(?:troop|soldier|guard|killed|dead)")
if v is not None:
loss_ir["personnel_killed"] = v
v = _first_int(t, r"(\d+)[\s\w]*(?:iranian|iran)[\s\w]*(?:troop|soldier|guard|killed|dead)")
if v is not None:
loss_ir["personnel_killed"] = v
v = _first_int(t, r"(?:iran|iranian)[\s\w]*(\d+)[\s\w]*(?:wounded|injured)")
if v is not None:
loss_ir["personnel_wounded"] = v
# 平民伤亡(中英文)
v = _first_int(t, r"(\d+)\s*名?\s*平民\s*(?:伤亡|死亡)")
if v is not None:
loss_us["civilian_killed"] = v
v = _first_int(t, r"(\d+)[\s\w]*(?:civilian|civil)[\s\w]*(?:killed|dead)") if loss_us.get("civilian_killed") is None else None
if v is not None:
loss_us["civilian_killed"] = v
v = _first_int(t, r"(\d+)[\s\w]*(?:civilian|civil)[\s\w]*(?:wounded|injured)")
if v is not None:
loss_us["civilian_wounded"] = v
# 基地损毁(美方基地居多)+ 中文
v = _first_int(t, r"(\d+)[\s\w]*(?:base|基地)[\s\w]*(?:destroyed|leveled|摧毁|夷平)")
if v is not None:
loss_us["bases_destroyed"] = v
v = _first_int(t, r"(\d+)[\s\w]*(?:base|基地)[\s\w]*(?:damaged|hit|struck|受损|袭击)")
if v is not None:
loss_us["bases_damaged"] = v
if ("base" in t or "基地" in t) and ("destroy" in t or "level" in t or "摧毁" in t or "夷平" in t) and not loss_us.get("bases_destroyed"):
loss_us["bases_destroyed"] = 1
if ("base" in t or "基地" in t) and ("damage" in t or "hit" in t or "struck" in t or "strike" in t or "袭击" in t or "受损" in t) and not loss_us.get("bases_damaged"):
loss_us["bases_damaged"] = 1
# 战机 / 舰船(根据上下文判断阵营)
v = _first_int(t, r"(\d+)[\s\w]*(?:aircraft|plane|jet|fighter|f-?16|f-?35|f-?18)[\s\w]*(?:down|destroyed|lost|shot)")
if v is not None:
if "us" in t or "american" in t or "u.s" in t:
loss_us["aircraft"] = v
elif "iran" in t:
loss_ir["aircraft"] = v
else:
loss_us["aircraft"] = v
v = _first_int(t, r"(\d+)[\s\w]*(?:ship|destroyer|warship|vessel)[\s\w]*(?:hit|damaged|sunk)")
if v is not None:
if "iran" in t:
loss_ir["warships"] = v
else:
loss_us["warships"] = v
if loss_us:
out.setdefault("combat_losses_delta", {})["us"] = loss_us
if loss_ir:
out.setdefault("combat_losses_delta", {})["iran"] = loss_ir
if "retaliat" in t or "revenge" in t or "报复" in t or "反击" in t:
out["retaliation"] = {"value": 75, "time": ts}
if "wall street" in t or " dow " in t or "s&p" in t or "market slump" in t or "stock fall" in t or "美股" in t:
out["wall_street"] = {"time": ts, "value": 55}
# key_location_updates受袭基地与 key_location.name 匹配)
# 新闻提及基地遭袭时,更新对应基地 status
base_attacked = ("base" in t or "基地" in t) and ("attack" in t or "hit" in t or "strike" in t or "damage" in t or "袭击" in t or "打击" in t)
if base_attacked:
updates: list = []
# 常见美军基地关键词 -> name_keywords用于 db_merge 的 LIKE 匹配)
bases_all = [
("阿萨德|阿因|asad|assad|ain", "us"),
("巴格达|baghdad", "us"),
("乌代德|udeid|卡塔尔|qatar", "us"),
("阿克罗蒂里|akrotiri|塞浦路斯|cyprus", "us"),
("巴格拉姆|bagram|阿富汗|afghanistan", "us"),
("埃尔比勒|erbil", "us"),
("因吉尔利克|incirlik|土耳其|turkey", "us"),
("苏尔坦|sultan|沙特|saudi", "us"),
("坦夫|tanf|叙利亚|syria", "us"),
("达夫拉|dhafra|阿联酋|uae", "us"),
("内瓦提姆|nevatim|拉蒙|ramon|以色列|israel", "us"),
("赛利耶|sayliyah", "us"),
("巴林|bahrain", "us"),
("科威特|kuwait", "us"),
# 伊朗基地
("阿巴斯港|abbas|bandar abbas", "iran"),
("德黑兰|tehran", "iran"),
("布什尔|bushehr", "iran"),
("伊斯法罕|isfahan|esfahan", "iran"),
("纳坦兹|natanz", "iran"),
("米纳布|minab", "iran"),
("卡拉季|karaj", "iran"),
("克尔曼沙赫|kermanshah", "iran"),
("大不里士|tabriz", "iran"),
("霍尔木兹|hormuz", "iran"),
]
for kws, side in bases_all:
if any(k in t for k in kws.split("|")):
updates.append({"name_keywords": kws, "side": side, "status": "attacked", "damage_level": 2})
if updates:
out["key_location_updates"] = updates
return out