Files
usa/crawler/extractor_ai.py
2026-03-02 23:21:07 +08:00

121 lines
5.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
从新闻文本中 AI 提取结构化数据,映射到面板 schema
输出符合 panel_schema 的字段,供 db_merge 写入
"""
import json
import os
import re
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional
from panel_schema import validate_category, validate_severity, validate_summary
CLEANER_AI_DISABLED = os.environ.get("CLEANER_AI_DISABLED", "0") == "1"
OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "llama3.1")
def _call_ollama_extract(text: str, timeout: int = 10) -> Optional[Dict[str, Any]]:
"""调用 Ollama 提取结构化数据。输出 JSON仅包含新闻中可明确推断的字段"""
if CLEANER_AI_DISABLED or not text or len(str(text).strip()) < 10:
return None
try:
import requests
prompt = f"""从以下美伊/中东新闻中提取可推断的数值,输出 JSON仅包含有明确依据的字段。无依据则省略该字段。
要求:
- summary: 1-2句中文事实≤80字
- category: deployment|alert|intel|diplomatic|other
- severity: low|medium|high|critical
- 战损(仅当新闻明确提及数字时填写,格式 us_XXX / iran_XXX:
us_personnel_killed, iran_personnel_killed, us_personnel_wounded, iran_personnel_wounded,
us_civilian_killed, iran_civilian_killed, us_civilian_wounded, iran_civilian_wounded,
us_bases_destroyed, iran_bases_destroyed, us_bases_damaged, iran_bases_damaged.
重要bases_* 仅指已确认损毁/受损的基地数量;"军事目标"/targets 等泛指不是基地,若报道只说"X个军事目标遭袭"而无具体基地名,不填写 bases_*
us_aircraft, iran_aircraft, us_warships, iran_warships, us_armor, iran_armor, us_vehicles, iran_vehicles,
us_drones, iran_drones, us_missiles, iran_missiles, us_helicopters, iran_helicopters, us_submarines, iran_submarines
- retaliation_sentiment: 0-100仅当新闻涉及伊朗报复情绪时
- wall_street_value: 0-100仅当新闻涉及美股/市场反应时
- key_location_updates: 当新闻提及具体基地/地点遭袭时,数组项 { "name_keywords": "asad|阿萨德|assad", "side": "us", "status": "attacked", "damage_level": 1-3 }
原文:{str(text)[:800]}
直接输出 JSON不要解释"""
r = requests.post(
"http://localhost:11434/api/chat",
json={
"model": OLLAMA_MODEL,
"messages": [{"role": "user", "content": prompt}],
"stream": False,
"options": {"num_predict": 256},
},
timeout=timeout,
)
if r.status_code != 200:
return None
raw = (r.json().get("message", {}).get("content", "") or "").strip()
raw = re.sub(r"^```\w*\s*|\s*```$", "", raw)
return json.loads(raw)
except Exception:
return None
def extract_from_news(text: str, timestamp: Optional[str] = None) -> Dict[str, Any]:
"""
从新闻文本提取结构化数据,严格符合面板 schema
返回: { situation_update?, combat_losses_delta?, retaliation?, wall_street?, ... }
"""
ts = timestamp or datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.000Z")
out: Dict[str, Any] = {}
parsed = _call_ollama_extract(text)
if not parsed:
return out
# situation_update
if parsed.get("summary"):
out["situation_update"] = {
"summary": validate_summary(str(parsed["summary"])[:120], 120),
"category": validate_category(str(parsed.get("category", "other")).lower()),
"severity": validate_severity(str(parsed.get("severity", "medium")).lower()),
"timestamp": ts,
}
# combat_losses 增量(仅数字字段)
loss_us = {}
loss_ir = {}
for k in ["personnel_killed", "personnel_wounded", "civilian_killed", "civilian_wounded", "bases_destroyed", "bases_damaged", "aircraft", "warships", "armor", "vehicles", "drones", "missiles", "helicopters", "submarines"]:
uk = f"us_{k}"
ik = f"iran_{k}"
if uk in parsed and isinstance(parsed[uk], (int, float)):
loss_us[k] = max(0, int(parsed[uk]))
if ik in parsed and isinstance(parsed[ik], (int, float)):
loss_ir[k] = max(0, int(parsed[ik]))
if loss_us or loss_ir:
out["combat_losses_delta"] = {}
if loss_us:
out["combat_losses_delta"]["us"] = loss_us
if loss_ir:
out["combat_losses_delta"]["iran"] = loss_ir
# retaliation
if "retaliation_sentiment" in parsed:
v = parsed["retaliation_sentiment"]
if isinstance(v, (int, float)) and 0 <= v <= 100:
out["retaliation"] = {"value": int(v), "time": ts}
# wall_street
if "wall_street_value" in parsed:
v = parsed["wall_street_value"]
if isinstance(v, (int, float)) and 0 <= v <= 100:
out["wall_street"] = {"time": ts, "value": int(v)}
# key_location_updates受袭基地
if "key_location_updates" in parsed and isinstance(parsed["key_location_updates"], list):
valid = []
for u in parsed["key_location_updates"]:
if isinstance(u, dict) and u.get("name_keywords") and u.get("side") in ("us", "iran"):
valid.append({
"name_keywords": str(u["name_keywords"]),
"side": u["side"],
"status": str(u.get("status", "attacked"))[:20],
"damage_level": min(3, max(1, int(u["damage_level"]))) if isinstance(u.get("damage_level"), (int, float)) else 2,
})
if valid:
out["key_location_updates"] = valid
return out