# -*- coding: utf-8 -*- """ 基于规则的新闻数据提取(无需 Ollama) 从新闻文本中提取战损、报复情绪等数值,供 db_merge 写入 """ import re from datetime import datetime, timezone from typing import Any, Dict, Optional def _first_int(text: str, pattern: str) -> Optional[int]: m = re.search(pattern, text, re.I) if m and m.group(1) and m.group(1).replace(",", "").isdigit(): return max(0, int(m.group(1).replace(",", ""))) return None def extract_from_news(text: str, timestamp: Optional[str] = None) -> Dict[str, Any]: """ 规则提取:匹配数字+关键词,输出符合 panel schema 的字段(无需 Ollama) """ ts = timestamp or datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.000Z") out: Dict[str, Any] = {} t = (text or "").lower() loss_us, loss_ir = {}, {} # 美军人员伤亡(中文,优先匹配) v = _first_int(t, r"造成\s*(\d+)\s*名?\s*美军\s*伤亡") if v is not None: loss_us["personnel_killed"] = v v = _first_int(t, r"(\d+)\s*名?\s*美军\s*伤亡") if loss_us.get("personnel_killed") is None else None if v is not None: loss_us["personnel_killed"] = v v = _first_int(t, r"(\d+)\s*名?\s*(?:美军|美国军队|美国)\s*(?:死亡|阵亡)") if v is not None: loss_us["personnel_killed"] = v v = _first_int(t, r"(\d+)\s*名?\s*(?:美军|美国)\s*受伤") if v is None and ("美军" in (text or "") or "美国" in (text or "")): v = _first_int(text or t, r"另有\s*(\d+)\s*人\s*受伤") if v is not None: loss_us["personnel_wounded"] = v v = _first_int(t, r"美军\s*伤亡\s*(\d+)") if v is not None and loss_us.get("personnel_killed") is None: loss_us["personnel_killed"] = v # 美军人员伤亡(英文) v = _first_int(t, r"(?:us|american|u\.?s\.?)[\s\w]*(?:say|report)[\s\w]*(\d+)[\s\w]*(?:troop|soldier|military)[\s\w]*(?:killed|dead)") if v is not None: loss_us["personnel_killed"] = v v = _first_int(t, r"(\d+)[\s\w]*(?:us|american)[\s\w]*(?:troop|soldier|military)[\s\w]*(?:killed|dead)") if v is not None: loss_us["personnel_killed"] = v v = _first_int(t, r"(?:us|american)[\s\w]*(\d+)[\s\w]*(?:wounded|injured)") if v is not None: loss_us["personnel_wounded"] = v # 伊朗人员伤亡(中文) v = _first_int(t, r"(\d+)\s*名?\s*伊朗\s*伤亡") if v is not None: loss_ir["personnel_killed"] = v v = _first_int(t, r"(\d+)\s*名?\s*(?:伊朗|伊朗军队)[\s\w]*(?:死亡|阵亡)") if v is not None: loss_ir["personnel_killed"] = v v = _first_int(t, r"(\d+)\s*名?\s*伊朗\s*受伤") if v is not None: loss_ir["personnel_wounded"] = v # 伊朗人员伤亡(英文) v = _first_int(t, r"(?:iran|iranian)[\s\w]*(?:say|report)[\s\w]*(\d+)[\s\w]*(?:troop|soldier|guard|killed|dead)") if v is not None: loss_ir["personnel_killed"] = v v = _first_int(t, r"(\d+)[\s\w]*(?:iranian|iran)[\s\w]*(?:troop|soldier|guard|killed|dead)") if v is not None: loss_ir["personnel_killed"] = v v = _first_int(t, r"(?:iran|iranian)[\s\w]*(\d+)[\s\w]*(?:wounded|injured)") if v is not None: loss_ir["personnel_wounded"] = v # 平民伤亡(中英文,按阵营归属) v = _first_int(t, r"(\d+)\s*名?\s*平民\s*(?:伤亡|死亡)") if v is not None: if "伊朗" in text or "iran" in t: loss_ir["civilian_killed"] = v else: loss_us["civilian_killed"] = v v = _first_int(t, r"(\d+)[\s\w]*(?:civilian|civil)[\s\w]*(?:killed|dead)") if loss_us.get("civilian_killed") is None and loss_ir.get("civilian_killed") is None else None if v is not None: if "iran" in t: loss_ir["civilian_killed"] = v else: loss_us["civilian_killed"] = v v = _first_int(t, r"(\d+)[\s\w]*(?:civilian|civil)[\s\w]*(?:wounded|injured)") if v is not None: if "iran" in t: loss_ir["civilian_wounded"] = v else: loss_us["civilian_wounded"] = v v = _first_int(text or t, r"伊朗[\s\w]*(?:空袭|打击)[\s\w]*造成[^\d]*(\d+)[\s\w]*(?:平民|人|伤亡)") if v is not None: loss_ir["civilian_killed"] = v # 基地损毁(仅匹配 base/基地,排除"军事目标"等泛指) skip_bases = "军事目标" in (text or "") and "基地" not in (text or "") and "base" not in t if not skip_bases: v = _first_int(t, r"(\d+)[\s\w]*(?:base|基地)[\s\w]*(?:destroyed|leveled|摧毁|夷平)") if v is not None: loss_us["bases_destroyed"] = v v = _first_int(t, r"(\d+)[\s\w]*(?:base|基地)[\s\w]*(?:damaged|hit|struck|受损|袭击)") if v is not None: loss_us["bases_damaged"] = v if ("base" in t or "基地" in t) and ("destroy" in t or "level" in t or "摧毁" in t or "夷平" in t) and not loss_us.get("bases_destroyed"): loss_us["bases_destroyed"] = 1 if ("base" in t or "基地" in t) and ("damage" in t or "hit" in t or "struck" in t or "strike" in t or "袭击" in t or "受损" in t) and not loss_us.get("bases_damaged"): loss_us["bases_damaged"] = 1 # 战机 / 舰船(根据上下文判断阵营) v = _first_int(t, r"(\d+)[\s\w]*(?:aircraft|plane|jet|fighter|f-?16|f-?35|f-?18)[\s\w]*(?:down|destroyed|lost|shot)") if v is not None: if "us" in t or "american" in t or "u.s" in t: loss_us["aircraft"] = v elif "iran" in t: loss_ir["aircraft"] = v else: loss_us["aircraft"] = v v = _first_int(t, r"(\d+)[\s\w]*(?:ship|destroyer|warship|vessel)[\s\w]*(?:hit|damaged|sunk)") if v is not None: if "iran" in t: loss_ir["warships"] = v else: loss_us["warships"] = v # 无人机 drone / uav / 无人机 v = _first_int(t, r"(\d+)[\s\w]*(?:drone|uav|无人机)[\s\w]*(?:down|destroyed|shot|击落|摧毁)") if v is None: v = _first_int(text or t, r"(?:击落|摧毁)[^\d]*(\d+)[\s\w]*(?:drone|uav|无人机|架)") if v is None: v = _first_int(t, r"(?:drone|uav|无人机)[\s\w]*(\d+)[\s\w]*(?:down|destroyed|shot|击落|摧毁)") if v is not None: if "iran" in t or "iranian" in t or "shahed" in t or "沙希德" in t or "伊朗" in (text or ""): loss_ir["drones"] = v else: loss_us["drones"] = v # 导弹 missile / 导弹 v = _first_int(t, r"(\d+)[\s\w]*(?:missile|导弹)[\s\w]*(?:fired|launched|intercepted|destroyed|发射|拦截|击落)") if v is not None: if "iran" in t or "iranian" in t: loss_ir["missiles"] = v else: loss_us["missiles"] = v v = _first_int(t, r"(?:missile|导弹)[\s\w]*(\d+)[\s\w]*(?:fired|launched|intercepted|destroyed|发射|拦截)") if not loss_us.get("missiles") and not loss_ir.get("missiles") else None if v is not None: if "iran" in t: loss_ir["missiles"] = v else: loss_us["missiles"] = v # 直升机 helicopter / 直升机 v = _first_int(t, r"(\d+)[\s\w]*(?:helicopter|直升机)[\s\w]*(?:down|destroyed|crashed|crashes|击落|坠毁)") if v is not None: if "iran" in t or "iranian" in t: loss_ir["helicopters"] = v else: loss_us["helicopters"] = v # 潜艇 submarine / 潜艇 v = _first_int(t, r"(\d+)[\s\w]*(?:submarine|潜艇)[\s\w]*(?:sunk|damaged|hit|destroyed|击沉|受损)") if v is not None: if "iran" in t or "iranian" in t: loss_ir["submarines"] = v else: loss_us["submarines"] = v if loss_us: out.setdefault("combat_losses_delta", {})["us"] = loss_us if loss_ir: out.setdefault("combat_losses_delta", {})["iran"] = loss_ir if "retaliat" in t or "revenge" in t or "报复" in t or "反击" in t: out["retaliation"] = {"value": 75, "time": ts} if "wall street" in t or " dow " in t or "s&p" in t or "market slump" in t or "stock fall" in t or "美股" in t: out["wall_street"] = {"time": ts, "value": 55} # key_location_updates:受袭基地(与 key_location.name 匹配) # 新闻提及基地遭袭时,更新对应基地 status;放宽触发词以匹配更多英文报道 attack_words = ("attack" in t or "attacked" in t or "hit" in t or "strike" in t or "struck" in t or "strikes" in t or "damage" in t or "damaged" in t or "target" in t or "targeted" in t or "bomb" in t or "bombed" in t or "袭击" in (text or "") or "遭袭" in (text or "") or "打击" in (text or "") or "受损" in (text or "") or "摧毁" in (text or "")) base_attacked = ("base" in t or "基地" in t or "outpost" in t or "facility" in t) and attack_words if base_attacked: updates: list = [] # 常见美军基地关键词 -> name_keywords(用于 db_merge 的 LIKE 匹配,需与 key_location.name 能匹配) bases_all = [ ("阿萨德|阿因|asad|assad|ain", "us"), ("巴格达|baghdad", "us"), ("乌代德|udeid|卡塔尔|qatar", "us"), ("阿克罗蒂里|akrotiri|塞浦路斯|cyprus", "us"), ("巴格拉姆|bagram|阿富汗|afghanistan", "us"), ("埃尔比勒|erbil", "us"), ("因吉尔利克|incirlik|土耳其|turkey", "us"), ("苏尔坦|sultan|沙特|saudi", "us"), ("坦夫|tanf|叙利亚|syria", "us"), ("达夫拉|dhafra|阿联酋|uae", "us"), ("内瓦提姆|nevatim|拉蒙|ramon|以色列|israel", "us"), ("赛利耶|sayliyah", "us"), ("巴林|bahrain", "us"), ("科威特|kuwait", "us"), # 伊朗基地 ("阿巴斯港|abbas|bandar abbas", "iran"), ("德黑兰|tehran", "iran"), ("布什尔|bushehr", "iran"), ("伊斯法罕|isfahan|esfahan", "iran"), ("纳坦兹|natanz", "iran"), ("米纳布|minab", "iran"), ("卡拉季|karaj", "iran"), ("克尔曼沙赫|kermanshah", "iran"), ("大不里士|tabriz", "iran"), ("霍尔木兹|hormuz", "iran"), ] for kws, side in bases_all: if any(k in t for k in kws.split("|")): updates.append({"name_keywords": kws, "side": side, "status": "attacked", "damage_level": 2}) if updates: out["key_location_updates"] = updates return out