Files
usa/crawler/extractor_rules.py
2026-03-02 23:21:07 +08:00

227 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
基于规则的新闻数据提取(无需 Ollama
从新闻文本中提取战损、报复情绪等数值,供 db_merge 写入
"""
import re
from datetime import datetime, timezone
from typing import Any, Dict, Optional
def _first_int(text: str, pattern: str) -> Optional[int]:
m = re.search(pattern, text, re.I)
if m and m.group(1) and m.group(1).replace(",", "").isdigit():
return max(0, int(m.group(1).replace(",", "")))
return None
def extract_from_news(text: str, timestamp: Optional[str] = None) -> Dict[str, Any]:
"""
规则提取:匹配数字+关键词,输出符合 panel schema 的字段(无需 Ollama
"""
ts = timestamp or datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.000Z")
out: Dict[str, Any] = {}
t = (text or "").lower()
loss_us, loss_ir = {}, {}
# 美军人员伤亡(中文,优先匹配)
v = _first_int(t, r"造成\s*(\d+)\s*名?\s*美军\s*伤亡")
if v is not None:
loss_us["personnel_killed"] = v
v = _first_int(t, r"(\d+)\s*名?\s*美军\s*伤亡") if loss_us.get("personnel_killed") is None else None
if v is not None:
loss_us["personnel_killed"] = v
v = _first_int(t, r"(\d+)\s*名?\s*(?:美军|美国军队|美国)\s*(?:死亡|阵亡)")
if v is not None:
loss_us["personnel_killed"] = v
v = _first_int(t, r"(\d+)\s*名?\s*(?:美军|美国)\s*受伤")
if v is None and ("美军" in (text or "") or "美国" in (text or "")):
v = _first_int(text or t, r"另有\s*(\d+)\s*人\s*受伤")
if v is not None:
loss_us["personnel_wounded"] = v
v = _first_int(t, r"美军\s*伤亡\s*(\d+)")
if v is not None and loss_us.get("personnel_killed") is None:
loss_us["personnel_killed"] = v
# 美军人员伤亡(英文)
v = _first_int(t, r"(?:us|american|u\.?s\.?)[\s\w]*(?:say|report)[\s\w]*(\d+)[\s\w]*(?:troop|soldier|military)[\s\w]*(?:killed|dead)")
if v is not None:
loss_us["personnel_killed"] = v
v = _first_int(t, r"(\d+)[\s\w]*(?:us|american)[\s\w]*(?:troop|soldier|military)[\s\w]*(?:killed|dead)")
if v is not None:
loss_us["personnel_killed"] = v
v = _first_int(t, r"(?:us|american)[\s\w]*(\d+)[\s\w]*(?:wounded|injured)")
if v is not None:
loss_us["personnel_wounded"] = v
# 伊朗人员伤亡(中文)
v = _first_int(t, r"(\d+)\s*名?\s*伊朗\s*伤亡")
if v is not None:
loss_ir["personnel_killed"] = v
v = _first_int(t, r"(\d+)\s*名?\s*(?:伊朗|伊朗军队)[\s\w]*(?:死亡|阵亡)")
if v is not None:
loss_ir["personnel_killed"] = v
v = _first_int(t, r"(\d+)\s*名?\s*伊朗\s*受伤")
if v is not None:
loss_ir["personnel_wounded"] = v
# 伊朗人员伤亡(英文)
v = _first_int(t, r"(?:iran|iranian)[\s\w]*(?:say|report)[\s\w]*(\d+)[\s\w]*(?:troop|soldier|guard|killed|dead)")
if v is not None:
loss_ir["personnel_killed"] = v
v = _first_int(t, r"(\d+)[\s\w]*(?:iranian|iran)[\s\w]*(?:troop|soldier|guard|killed|dead)")
if v is not None:
loss_ir["personnel_killed"] = v
v = _first_int(t, r"(?:iran|iranian)[\s\w]*(\d+)[\s\w]*(?:wounded|injured)")
if v is not None:
loss_ir["personnel_wounded"] = v
# 平民伤亡(中英文,按阵营归属)
v = _first_int(t, r"(\d+)\s*名?\s*平民\s*(?:伤亡|死亡)")
if v is not None:
if "伊朗" in text or "iran" in t:
loss_ir["civilian_killed"] = v
else:
loss_us["civilian_killed"] = v
v = _first_int(t, r"(\d+)[\s\w]*(?:civilian|civil)[\s\w]*(?:killed|dead)") if loss_us.get("civilian_killed") is None and loss_ir.get("civilian_killed") is None else None
if v is not None:
if "iran" in t:
loss_ir["civilian_killed"] = v
else:
loss_us["civilian_killed"] = v
v = _first_int(t, r"(\d+)[\s\w]*(?:civilian|civil)[\s\w]*(?:wounded|injured)")
if v is not None:
if "iran" in t:
loss_ir["civilian_wounded"] = v
else:
loss_us["civilian_wounded"] = v
v = _first_int(text or t, r"伊朗[\s\w]*(?:空袭|打击)[\s\w]*造成[^\d]*(\d+)[\s\w]*(?:平民|人|伤亡)")
if v is not None:
loss_ir["civilian_killed"] = v
# 基地损毁(仅匹配 base/基地,排除"军事目标"等泛指)
skip_bases = "军事目标" in (text or "") and "基地" not in (text or "") and "base" not in t
if not skip_bases:
v = _first_int(t, r"(\d+)[\s\w]*(?:base|基地)[\s\w]*(?:destroyed|leveled|摧毁|夷平)")
if v is not None:
loss_us["bases_destroyed"] = v
v = _first_int(t, r"(\d+)[\s\w]*(?:base|基地)[\s\w]*(?:damaged|hit|struck|受损|袭击)")
if v is not None:
loss_us["bases_damaged"] = v
if ("base" in t or "基地" in t) and ("destroy" in t or "level" in t or "摧毁" in t or "夷平" in t) and not loss_us.get("bases_destroyed"):
loss_us["bases_destroyed"] = 1
if ("base" in t or "基地" in t) and ("damage" in t or "hit" in t or "struck" in t or "strike" in t or "袭击" in t or "受损" in t) and not loss_us.get("bases_damaged"):
loss_us["bases_damaged"] = 1
# 战机 / 舰船(根据上下文判断阵营)
v = _first_int(t, r"(\d+)[\s\w]*(?:aircraft|plane|jet|fighter|f-?16|f-?35|f-?18)[\s\w]*(?:down|destroyed|lost|shot)")
if v is not None:
if "us" in t or "american" in t or "u.s" in t:
loss_us["aircraft"] = v
elif "iran" in t:
loss_ir["aircraft"] = v
else:
loss_us["aircraft"] = v
v = _first_int(t, r"(\d+)[\s\w]*(?:ship|destroyer|warship|vessel)[\s\w]*(?:hit|damaged|sunk)")
if v is not None:
if "iran" in t:
loss_ir["warships"] = v
else:
loss_us["warships"] = v
# 无人机 drone / uav / 无人机
v = _first_int(t, r"(\d+)[\s\w]*(?:drone|uav|无人机)[\s\w]*(?:down|destroyed|shot|击落|摧毁)")
if v is None:
v = _first_int(text or t, r"(?:击落|摧毁)[^\d]*(\d+)[\s\w]*(?:drone|uav|无人机|架)")
if v is None:
v = _first_int(t, r"(?:drone|uav|无人机)[\s\w]*(\d+)[\s\w]*(?:down|destroyed|shot|击落|摧毁)")
if v is not None:
if "iran" in t or "iranian" in t or "shahed" in t or "沙希德" in t or "伊朗" in (text or ""):
loss_ir["drones"] = v
else:
loss_us["drones"] = v
# 导弹 missile / 导弹
v = _first_int(t, r"(\d+)[\s\w]*(?:missile|导弹)[\s\w]*(?:fired|launched|intercepted|destroyed|发射|拦截|击落)")
if v is not None:
if "iran" in t or "iranian" in t:
loss_ir["missiles"] = v
else:
loss_us["missiles"] = v
v = _first_int(t, r"(?:missile|导弹)[\s\w]*(\d+)[\s\w]*(?:fired|launched|intercepted|destroyed|发射|拦截)") if not loss_us.get("missiles") and not loss_ir.get("missiles") else None
if v is not None:
if "iran" in t:
loss_ir["missiles"] = v
else:
loss_us["missiles"] = v
# 直升机 helicopter / 直升机
v = _first_int(t, r"(\d+)[\s\w]*(?:helicopter|直升机)[\s\w]*(?:down|destroyed|crashed|crashes|击落|坠毁)")
if v is not None:
if "iran" in t or "iranian" in t:
loss_ir["helicopters"] = v
else:
loss_us["helicopters"] = v
# 潜艇 submarine / 潜艇
v = _first_int(t, r"(\d+)[\s\w]*(?:submarine|潜艇)[\s\w]*(?:sunk|damaged|hit|destroyed|击沉|受损)")
if v is not None:
if "iran" in t or "iranian" in t:
loss_ir["submarines"] = v
else:
loss_us["submarines"] = v
if loss_us:
out.setdefault("combat_losses_delta", {})["us"] = loss_us
if loss_ir:
out.setdefault("combat_losses_delta", {})["iran"] = loss_ir
if "retaliat" in t or "revenge" in t or "报复" in t or "反击" in t:
out["retaliation"] = {"value": 75, "time": ts}
if "wall street" in t or " dow " in t or "s&p" in t or "market slump" in t or "stock fall" in t or "美股" in t:
out["wall_street"] = {"time": ts, "value": 55}
# key_location_updates受袭基地与 key_location.name 匹配)
# 新闻提及基地遭袭时,更新对应基地 status放宽触发词以匹配更多英文报道
attack_words = ("attack" in t or "attacked" in t or "hit" in t or "strike" in t or "struck" in t or "strikes" in t
or "damage" in t or "damaged" in t or "target" in t or "targeted" in t or "bomb" in t or "bombed" in t
or "袭击" in (text or "") or "遭袭" in (text or "") or "打击" in (text or "") or "受损" in (text or "") or "摧毁" in (text or ""))
base_attacked = ("base" in t or "基地" in t or "outpost" in t or "facility" in t) and attack_words
if base_attacked:
updates: list = []
# 常见美军基地关键词 -> name_keywords用于 db_merge 的 LIKE 匹配,需与 key_location.name 能匹配)
bases_all = [
("阿萨德|阿因|asad|assad|ain", "us"),
("巴格达|baghdad", "us"),
("乌代德|udeid|卡塔尔|qatar", "us"),
("阿克罗蒂里|akrotiri|塞浦路斯|cyprus", "us"),
("巴格拉姆|bagram|阿富汗|afghanistan", "us"),
("埃尔比勒|erbil", "us"),
("因吉尔利克|incirlik|土耳其|turkey", "us"),
("苏尔坦|sultan|沙特|saudi", "us"),
("坦夫|tanf|叙利亚|syria", "us"),
("达夫拉|dhafra|阿联酋|uae", "us"),
("内瓦提姆|nevatim|拉蒙|ramon|以色列|israel", "us"),
("赛利耶|sayliyah", "us"),
("巴林|bahrain", "us"),
("科威特|kuwait", "us"),
# 伊朗基地
("阿巴斯港|abbas|bandar abbas", "iran"),
("德黑兰|tehran", "iran"),
("布什尔|bushehr", "iran"),
("伊斯法罕|isfahan|esfahan", "iran"),
("纳坦兹|natanz", "iran"),
("米纳布|minab", "iran"),
("卡拉季|karaj", "iran"),
("克尔曼沙赫|kermanshah", "iran"),
("大不里士|tabriz", "iran"),
("霍尔木兹|hormuz", "iran"),
]
for kws, side in bases_all:
if any(k in t for k in kws.split("|")):
updates.append({"name_keywords": kws, "side": side, "status": "attacked", "damage_level": 2})
if updates:
out["key_location_updates"] = updates
return out