Files
usa/crawler/extractor_dashscope.py
2026-03-03 13:02:28 +08:00

127 lines
6.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
阿里云 DashScope通义千问提取面板结构化数据
从新闻文本中提取战损、报复指数、基地状态等,供 db_merge 落库
API Key 通过环境变量 DASHSCOPE_API_KEY 配置
"""
import json
import os
import re
from datetime import datetime, timezone
from typing import Any, Dict, Optional
from panel_schema import validate_category, validate_severity, validate_summary
EXTRACT_TEXT_MAX_LEN = int(os.environ.get("EXTRACT_TEXT_MAX_LEN", "4000"))
def _call_dashscope_extract(text: str, timeout: int = 15) -> Optional[Dict[str, Any]]:
"""调用阿里云 DashScope 从新闻全文中提取精确结构化数据,仅填写报道明确给出的数字与事实。"""
api_key = os.environ.get("DASHSCOPE_API_KEY", "").strip()
if not api_key or not text or len(str(text).strip()) < 10:
return None
try:
import dashscope
from http import HTTPStatus
dashscope.api_key = api_key
raw = str(text).strip()[:EXTRACT_TEXT_MAX_LEN]
prompt = f"""从以下美伊/中东新闻**全文或摘要**中,提取**报道明确给出的数字与事实**,输出 JSON。规则
1. 仅填写报道中**直接出现、可核对**的数据,不要推测或估算。
2. 无明确依据的字段**必须省略**,不要填 0 或猜。
3. **战损一律按增量**:只填本则报道中「本次/此次/今日」**新增**数量。报道若只给「累计总数」「迄今共」**不要填**该字段。
4. **攻击地点**:提取双方遭袭地点。美军/盟军基地被打击 → side=us伊朗/亲伊设施被打击 → side=iran。name_keywords 用「中文|英文」,可填多处。
字段:
- summary: 1-2 句中文事实概括≤80 字
- category: deployment|alert|intel|diplomatic|other
- severity: low|medium|high|critical
- 战损(**仅填本则报道的新增增量**: us_personnel_killed, iran_personnel_killed, us_personnel_wounded, iran_personnel_wounded, us_civilian_killed, iran_civilian_killed, us_civilian_wounded, iran_civilian_wounded, us_bases_destroyed, iran_bases_destroyed, us_bases_damaged, iran_bases_damaged, us_aircraft, iran_aircraft, us_warships, iran_warships, us_armor, iran_armor, us_vehicles, iran_vehicles, us_drones, iran_drones, us_missiles, iran_missiles, us_helicopters, iran_helicopters, us_submarines, iran_submarines, us_carriers, iran_carriers, us_civilian_ships, iran_civilian_ships, us_airport_port, iran_airport_port
- retaliation_sentiment: 0-100仅当报道涉及伊朗报复情绪时
- wall_street_value: 0-100仅当报道涉及美股/市场时)
- key_location_updates: **双方攻击地点**。每项 {{"name_keywords":"阿萨德|asad","side":"us或iran被打击方","status":"attacked","damage_level":1-3}}。美军基地:阿萨德|asad、乌代德|udeid、埃尔比勒|erbil、因吉尔利克|incirlik。伊朗德黑兰|tehran、布什尔|bushehr、伊斯法罕|isfahan、阿巴斯|abbas、纳坦兹|natanz
原文:
{raw}
直接输出 JSON不要其他解释"""
response = dashscope.Generation.call(
model="qwen-turbo",
messages=[{"role": "user", "content": prompt}],
result_format="message",
max_tokens=512,
)
if response.status_code != HTTPStatus.OK:
return None
raw = (response.output.get("choices", [{}])[0].get("message", {}).get("content", "") or "").strip()
raw = re.sub(r"^```\w*\s*|\s*```$", "", raw)
return json.loads(raw)
except Exception:
return None
def extract_from_news(text: str, timestamp: Optional[str] = None) -> Dict[str, Any]:
"""
从新闻文本提取结构化数据,符合面板 schema
返回: { situation_update?, combat_losses_delta?, retaliation?, wall_street?, key_location_updates? }
"""
ts = timestamp or datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.000Z")
out: Dict[str, Any] = {}
parsed = _call_dashscope_extract(text)
if not parsed:
return out
if parsed.get("summary"):
out["situation_update"] = {
"summary": validate_summary(str(parsed["summary"])[:120], 120),
"category": validate_category(str(parsed.get("category", "other")).lower()),
"severity": validate_severity(str(parsed.get("severity", "medium")).lower()),
"timestamp": ts,
}
loss_us = {}
loss_ir = {}
for k in ["personnel_killed", "personnel_wounded", "civilian_killed", "civilian_wounded",
"bases_destroyed", "bases_damaged", "aircraft", "warships", "armor", "vehicles",
"drones", "missiles", "helicopters", "submarines", "carriers", "civilian_ships", "airport_port"]:
uk, ik = f"us_{k}", f"iran_{k}"
if uk in parsed and isinstance(parsed[uk], (int, float)):
loss_us[k] = max(0, int(parsed[uk]))
if ik in parsed and isinstance(parsed[ik], (int, float)):
loss_ir[k] = max(0, int(parsed[ik]))
if loss_us or loss_ir:
out["combat_losses_delta"] = {}
if loss_us:
out["combat_losses_delta"]["us"] = loss_us
if loss_ir:
out["combat_losses_delta"]["iran"] = loss_ir
if "retaliation_sentiment" in parsed:
v = parsed["retaliation_sentiment"]
if isinstance(v, (int, float)) and 0 <= v <= 100:
out["retaliation"] = {"value": int(v), "time": ts}
if "wall_street_value" in parsed:
v = parsed["wall_street_value"]
if isinstance(v, (int, float)) and 0 <= v <= 100:
out["wall_street"] = {"time": ts, "value": int(v)}
if "key_location_updates" in parsed and isinstance(parsed["key_location_updates"], list):
valid = []
for u in parsed["key_location_updates"]:
if isinstance(u, dict) and u.get("name_keywords") and u.get("side") in ("us", "iran"):
valid.append({
"name_keywords": str(u["name_keywords"]),
"side": u["side"],
"status": str(u.get("status", "attacked"))[:20],
"damage_level": min(3, max(1, int(u["damage_level"]))) if isinstance(u.get("damage_level"), (int, float)) else 2,
})
if valid:
out["key_location_updates"] = valid
return out