Files
usa/crawler/extractor_ai.py
2026-03-03 13:02:28 +08:00

131 lines
6.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
从新闻文本中 AI 提取结构化数据,映射到面板 schema
输出符合 panel_schema 的字段,供 db_merge 写入
"""
import json
import os
import re
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from panel_schema import validate_category, validate_severity, validate_summary
CLEANER_AI_DISABLED = os.environ.get("CLEANER_AI_DISABLED", "0") == "1"
OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "llama3.1")
# 用于 AI 提取的原文最大长度(有正文时取更长以提取精确数据)
EXTRACT_TEXT_MAX_LEN = int(os.environ.get("EXTRACT_TEXT_MAX_LEN", "4000"))
def _call_ollama_extract(text: str, timeout: int = 15) -> Optional[Dict[str, Any]]:
"""调用 Ollama 从新闻全文/摘要中提取精确结构化数据,仅填写报道中明确给出的数字与事实。"""
if CLEANER_AI_DISABLED or not text or len(str(text).strip()) < 10:
return None
try:
import requests
raw = str(text).strip()[:EXTRACT_TEXT_MAX_LEN]
prompt = f"""从以下美伊/中东新闻**全文或摘要**中,提取**报道明确给出的数字与事实**,输出 JSON。规则
1. 仅填写报道中**直接出现、可核对**的数据,不要推测或估算。
2. 无明确依据的字段**必须省略**,不要填 0 或猜。
3. **战损一律按增量**:只填本则报道中「本次/此次/今日/本轮」**新增**的伤亡或损毁数量。若报道只给「累计总数」「迄今共」「total so far」等**不要填写**该字段(避免与库内已有累计值重复叠加)。
4. **攻击地点**:提取双方遭袭的具体地点。美军/盟军基地被打击 → side=us伊朗/亲伊设施被打击 → side=iran。name_keywords 用「中文名|英文名」便于匹配,可填多处。
字段说明:
- summary: 1-2 句中文事实概括≤80 字
- category: deployment|alert|intel|diplomatic|other
- severity: low|medium|high|critical
- 战损(**仅填本则报道的新增增量**,如「此次 5 人丧生」「今日又损 2 架」):
us_personnel_killed, iran_personnel_killed, us_personnel_wounded, iran_personnel_wounded,
us_civilian_killed, iran_civilian_killed, us_civilian_wounded, iran_civilian_wounded,
us_bases_destroyed, iran_bases_destroyed, us_bases_damaged, iran_bases_damaged,
us_aircraft, iran_aircraft, us_warships, iran_warships, us_armor, iran_armor, us_vehicles, iran_vehicles,
us_drones, iran_drones, us_missiles, iran_missiles, us_helicopters, iran_helicopters, us_submarines, iran_submarines,
us_carriers, iran_carriers, us_civilian_ships, iran_civilian_ships, us_airport_port, iran_airport_port
- retaliation_sentiment: 0-100仅当报道涉及伊朗报复/反击情绪时
- wall_street_value: 0-100仅当报道涉及美股/市场时
- key_location_updates: **双方攻击地点**。每项 {{ "name_keywords": "阿萨德|asad|al-asad", "side": "us或iran被打击方", "status": "attacked", "damage_level": 1-3 }}。美军基地例:阿萨德|asad、乌代德|udeid、埃尔比勒|erbil、因吉尔利克|incirlik。伊朗例德黑兰|tehran、布什尔|bushehr、伊斯法罕|isfahan、阿巴斯|abbas、纳坦兹|natanz
原文:
{raw}
直接输出 JSON不要解释"""
r = requests.post(
"http://localhost:11434/api/chat",
json={
"model": OLLAMA_MODEL,
"messages": [{"role": "user", "content": prompt}],
"stream": False,
"options": {"num_predict": 384},
},
timeout=timeout,
)
if r.status_code != 200:
return None
raw = (r.json().get("message", {}).get("content", "") or "").strip()
raw = re.sub(r"^```\w*\s*|\s*```$", "", raw)
return json.loads(raw)
except Exception:
return None
def extract_from_news(text: str, timestamp: Optional[str] = None) -> Dict[str, Any]:
"""
从新闻文本提取结构化数据,严格符合面板 schema
返回: { situation_update?, combat_losses_delta?, retaliation?, wall_street?, ... }
"""
ts = timestamp or datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.000Z")
out: Dict[str, Any] = {}
parsed = _call_ollama_extract(text)
if not parsed:
return out
# situation_update
if parsed.get("summary"):
out["situation_update"] = {
"summary": validate_summary(str(parsed["summary"])[:120], 120),
"category": validate_category(str(parsed.get("category", "other")).lower()),
"severity": validate_severity(str(parsed.get("severity", "medium")).lower()),
"timestamp": ts,
}
# combat_losses 增量(仅数字字段)
loss_us = {}
loss_ir = {}
for k in ["personnel_killed", "personnel_wounded", "civilian_killed", "civilian_wounded", "bases_destroyed", "bases_damaged", "aircraft", "warships", "armor", "vehicles", "drones", "missiles", "helicopters", "submarines", "carriers", "civilian_ships", "airport_port"]:
uk = f"us_{k}"
ik = f"iran_{k}"
if uk in parsed and isinstance(parsed[uk], (int, float)):
loss_us[k] = max(0, int(parsed[uk]))
if ik in parsed and isinstance(parsed[ik], (int, float)):
loss_ir[k] = max(0, int(parsed[ik]))
if loss_us or loss_ir:
out["combat_losses_delta"] = {}
if loss_us:
out["combat_losses_delta"]["us"] = loss_us
if loss_ir:
out["combat_losses_delta"]["iran"] = loss_ir
# retaliation
if "retaliation_sentiment" in parsed:
v = parsed["retaliation_sentiment"]
if isinstance(v, (int, float)) and 0 <= v <= 100:
out["retaliation"] = {"value": int(v), "time": ts}
# wall_street
if "wall_street_value" in parsed:
v = parsed["wall_street_value"]
if isinstance(v, (int, float)) and 0 <= v <= 100:
out["wall_street"] = {"time": ts, "value": int(v)}
# key_location_updates受袭基地
if "key_location_updates" in parsed and isinstance(parsed["key_location_updates"], list):
valid = []
for u in parsed["key_location_updates"]:
if isinstance(u, dict) and u.get("name_keywords") and u.get("side") in ("us", "iran"):
valid.append({
"name_keywords": str(u["name_keywords"]),
"side": u["side"],
"status": str(u.get("status", "attacked"))[:20],
"damage_level": min(3, max(1, int(u["damage_level"]))) if isinstance(u.get("damage_level"), (int, float)) else 2,
})
if valid:
out["key_location_updates"] = valid
return out