Files
usa/crawler/realtime_conflict_service.py
2026-03-02 17:20:31 +08:00

504 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
GDELT 实时冲突抓取 + API 服务
核心数据源GDELT Project约 15 分钟级更新,含经纬度、事件编码、参与方、事件强度
"""
import os
# 直连外网,避免系统代理导致 ProxyError / 超时(需代理时设置 CRAWLER_USE_PROXY=1
if os.environ.get("CRAWLER_USE_PROXY") != "1":
os.environ.setdefault("NO_PROXY", "*")
import hashlib
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import List, Optional
import asyncio
import logging
import requests
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
logging.getLogger("uvicorn").setLevel(logging.INFO)
app = FastAPI(title="GDELT Conflict Service")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"])
# 配置
PROJECT_ROOT = Path(__file__).resolve().parent.parent
DB_PATH = os.environ.get("DB_PATH", str(PROJECT_ROOT / "server" / "data.db"))
API_BASE = os.environ.get("API_BASE", "http://localhost:3001")
QUERY = os.environ.get("GDELT_QUERY", "United States Iran military")
MAX_RECORDS = int(os.environ.get("GDELT_MAX_RECORDS", "30"))
FETCH_INTERVAL_SEC = int(os.environ.get("FETCH_INTERVAL_SEC", "60"))
RSS_INTERVAL_SEC = int(os.environ.get("RSS_INTERVAL_SEC", "60")) # 每分钟抓取世界主流媒体
# 时间范围1h=1小时 1d=1天 1week=1周不设则默认 3 个月(易返回旧文)
GDELT_TIMESPAN = os.environ.get("GDELT_TIMESPAN", "1d")
# 设为 1 则跳过 GDELT仅用 RSS 新闻作为事件脉络GDELT 国外可能无法访问)
GDELT_DISABLED = os.environ.get("GDELT_DISABLED", "0") == "1"
# 伊朗攻击源(无经纬度时默认)
IRAN_COORD = [51.3890, 35.6892] # Tehran [lng, lat]
# 请求直连,不经过系统代理(避免 ProxyError / 代理超时)
_REQ_KW = {"timeout": 15, "headers": {"User-Agent": "US-Iran-Dashboard/1.0"}}
if os.environ.get("CRAWLER_USE_PROXY") != "1":
_REQ_KW["proxies"] = {"http": None, "https": None}
EVENT_CACHE: List[dict] = []
# ==========================
# 冲突强度评分 (110)
# ==========================
def calculate_impact_score(title: str) -> int:
score = 1
t = (title or "").lower()
if "missile" in t or "导弹" in t:
score += 3
if "strike" in t or "袭击" in t or "打击" in t:
score += 2
if "killed" in t or "death" in t or "casualt" in t or "死亡" in t or "伤亡" in t:
score += 4
if "troops" in t or "soldier" in t or "士兵" in t or "军人" in t:
score += 2
if "attack" in t or "attacked" in t or "攻击" in t:
score += 3
if "nuclear" in t or "" in t:
score += 4
if "explosion" in t or "blast" in t or "bomb" in t or "爆炸" in t:
score += 2
return min(score, 10)
# 根据 severity 映射到 impact_score
def _severity_to_score(sev: str) -> int:
m = {"critical": 9, "high": 7, "medium": 5, "low": 2}
return m.get((sev or "").lower(), 5)
# 根据文本推断坐标 [lng, lat],用于 GDELT 禁用时 RSS→gdelt_events
_LOC_COORDS = [
(["阿克罗蒂里", "akrotiri", "塞浦路斯", "cyprus"], (32.98, 34.58)),
(["巴格拉姆", "bagram", "阿富汗", "afghanistan"], (69.26, 34.95)),
(["巴格达", "baghdad", "伊拉克", "iraq"], (44.37, 33.31)),
(["贝鲁特", "beirut", "黎巴嫩", "lebanon"], (35.49, 33.89)),
(["耶路撒冷", "jerusalem", "特拉维夫", "tel aviv", "以色列", "israel"], (35.21, 31.77)),
(["阿巴斯港", "bandar abbas", "霍尔木兹", "hormuz"], (56.27, 27.18)),
(["米纳布", "minab"], (57.08, 27.13)),
(["德黑兰", "tehran", "伊朗", "iran"], (51.389, 35.689)),
(["大马士革", "damascus", "叙利亚", "syria"], (36.28, 33.50)),
(["迪拜", "dubai", "阿联酋", "uae"], (55.27, 25.20)),
(["沙特", "saudi"], (46.73, 24.71)),
(["巴基斯坦", "pakistan"], (73.06, 33.72)),
(["奥斯汀", "austin"], (-97.74, 30.27)),
]
def _infer_coords(text: str) -> tuple:
t = (text or "").lower()
for kws, (lng, lat) in _LOC_COORDS:
for k in kws:
if k in t:
return (lng, lat)
return (IRAN_COORD[0], IRAN_COORD[1])
# ==========================
# 获取 GDELT 实时事件
# ==========================
def _parse_article(article: dict) -> Optional[dict]:
title_raw = article.get("title") or article.get("seendate") or ""
if not title_raw:
return None
from translate_utils import translate_to_chinese
from cleaner_ai import clean_news_for_panel
title = translate_to_chinese(str(title_raw)[:500])
title = clean_news_for_panel(title, max_len=150)
url = article.get("url") or article.get("socialimage") or ""
seendate = article.get("seendate") or datetime.utcnow().isoformat()
lat = article.get("lat")
lng = article.get("lng")
# 无经纬度时使用伊朗坐标(攻击源)
if lat is None or lng is None:
lat, lng = IRAN_COORD[1], IRAN_COORD[0]
try:
lat, lng = float(lat), float(lng)
except (TypeError, ValueError):
lat, lng = IRAN_COORD[1], IRAN_COORD[0]
impact = calculate_impact_score(title_raw)
event_id = hashlib.sha256(f"{url}{seendate}".encode()).hexdigest()[:24]
return {
"event_id": event_id,
"event_time": seendate,
"title": title[:500],
"lat": lat,
"lng": lng,
"impact_score": impact,
"url": url,
}
def fetch_gdelt_events() -> None:
if GDELT_DISABLED:
return
url = (
"https://api.gdeltproject.org/api/v2/doc/doc"
f"?query={QUERY}"
"&mode=ArtList"
"&format=json"
f"&maxrecords={MAX_RECORDS}"
f"&timespan={GDELT_TIMESPAN}"
"&sort=datedesc"
)
try:
resp = requests.get(url, **_REQ_KW)
resp.raise_for_status()
data = resp.json()
articles = data.get("articles", data) if isinstance(data, dict) else (data if isinstance(data, list) else [])
if not isinstance(articles, list):
articles = []
new_events = []
for a in articles:
ev = _parse_article(a) if isinstance(a, dict) else None
if ev:
new_events.append(ev)
# 按 event_time 排序,最新在前
new_events.sort(key=lambda e: e.get("event_time", ""), reverse=True)
global EVENT_CACHE
EVENT_CACHE = new_events
# 写入 SQLite 并通知 Node
_write_to_db(new_events)
_notify_node()
print(f"[{datetime.now().strftime('%H:%M:%S')}] GDELT 更新 {len(new_events)} 条事件")
except Exception:
pass
def _ensure_table(conn: sqlite3.Connection) -> None:
conn.execute("""
CREATE TABLE IF NOT EXISTS gdelt_events (
event_id TEXT PRIMARY KEY,
event_time TEXT NOT NULL,
title TEXT NOT NULL,
lat REAL NOT NULL,
lng REAL NOT NULL,
impact_score INTEGER NOT NULL,
url TEXT,
created_at TEXT DEFAULT (datetime('now'))
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS conflict_stats (
id INTEGER PRIMARY KEY CHECK (id = 1),
total_events INTEGER NOT NULL,
high_impact_events INTEGER NOT NULL,
estimated_casualties INTEGER NOT NULL,
estimated_strike_count INTEGER NOT NULL,
updated_at TEXT NOT NULL
)
""")
conn.commit()
def _write_to_db(events: List[dict]) -> None:
if not os.path.exists(DB_PATH):
return
conn = sqlite3.connect(DB_PATH, timeout=10)
try:
_ensure_table(conn)
for e in events:
conn.execute(
"INSERT OR REPLACE INTO gdelt_events (event_id, event_time, title, lat, lng, impact_score, url) VALUES (?, ?, ?, ?, ?, ?, ?)",
(
e["event_id"],
e.get("event_time", ""),
e.get("title", ""),
e.get("lat", 0),
e.get("lng", 0),
e.get("impact_score", 1),
e.get("url", ""),
),
)
# 战损统计模型(展示用)
high = sum(1 for x in events if x.get("impact_score", 0) >= 7)
strikes = sum(1 for x in events if "strike" in (x.get("title") or "").lower() or "attack" in (x.get("title") or "").lower())
casualties = min(5000, high * 80 + len(events) * 10) # 估算
conn.execute(
"INSERT OR REPLACE INTO conflict_stats (id, total_events, high_impact_events, estimated_casualties, estimated_strike_count, updated_at) VALUES (1, ?, ?, ?, ?, ?)",
(len(events), high, casualties, strikes, datetime.utcnow().isoformat()),
)
conn.execute(
"INSERT OR REPLACE INTO situation (id, data, updated_at) VALUES (1, '{}', ?)",
(datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.000Z"),),
)
conn.commit()
except Exception as e:
print(f"写入 DB 失败: {e}")
conn.rollback()
finally:
conn.close()
def _notify_node() -> None:
try:
r = requests.post(f"{API_BASE}/api/crawler/notify", timeout=5, proxies={"http": None, "https": None})
if r.status_code != 200:
print(" [warn] notify API 失败")
except Exception as e:
print(f" [warn] notify API: {e}")
def _rss_to_gdelt_fallback() -> None:
"""GDELT 禁用时,将 situation_update 同步到 gdelt_events使地图有冲突点"""
if not GDELT_DISABLED or not os.path.exists(DB_PATH):
return
try:
conn = sqlite3.connect(DB_PATH, timeout=10)
rows = conn.execute(
"SELECT id, timestamp, category, summary, severity FROM situation_update ORDER BY timestamp DESC LIMIT 50"
).fetchall()
conn.close()
events = []
for r in rows:
uid, ts, cat, summary, sev = r
lng, lat = _infer_coords((summary or "")[:300])
impact = _severity_to_score(sev)
events.append({
"event_id": f"rss_{uid}",
"event_time": ts,
"title": (summary or "")[:500],
"lat": lat,
"lng": lng,
"impact_score": impact,
"url": "",
})
if events:
global EVENT_CACHE
EVENT_CACHE = events
_write_to_db(events)
_notify_node()
except Exception as e:
print(f" [warn] RSS→gdelt fallback: {e}")
# ==========================
# RSS 新闻抓取:资讯落库(去重) → AI 提取 → 面板数据落库 → 通知前端
# ==========================
LAST_FETCH = {"items": 0, "inserted": 0, "error": None}
def fetch_news() -> None:
try:
from scrapers.rss_scraper import fetch_all
from db_writer import write_updates
from news_storage import save_and_dedup
from translate_utils import translate_to_chinese
from cleaner_ai import clean_news_for_panel
from cleaner_ai import ensure_category, ensure_severity
LAST_FETCH["error"] = None
items = fetch_all()
for it in items:
raw_title = translate_to_chinese(it.get("title", "") or "")
raw_summary = translate_to_chinese(it.get("summary", "") or it.get("title", ""))
it["title"] = clean_news_for_panel(raw_title, max_len=80)
it["summary"] = clean_news_for_panel(raw_summary or raw_title, max_len=120)
it["category"] = ensure_category(it.get("category", "other"))
it["severity"] = ensure_severity(it.get("severity", "medium"))
it["source"] = it.get("source") or "rss"
# 1. 历史去重:资讯内容落库 news_content独立表便于后续消费
new_items, n_news = save_and_dedup(items, db_path=DB_PATH)
# 2. 面板展示:新增资讯写入 situation_update供前端 recentUpdates
n_panel = write_updates(new_items) if new_items else 0
LAST_FETCH["items"] = len(items)
LAST_FETCH["inserted"] = n_news
# 3. AI 提取 + 合并到 combat_losses / key_location 等
if new_items:
_extract_and_merge_panel_data(new_items)
# GDELT 禁用时用 RSS 填充 gdelt_events使地图有冲突点
if GDELT_DISABLED:
_rss_to_gdelt_fallback()
_notify_node()
print(f"[{datetime.now().strftime('%H:%M:%S')}] RSS 抓取 {len(items)} 条,去重后新增 {n_news} 条资讯,面板 {n_panel}")
except Exception as e:
LAST_FETCH["error"] = str(e)
print(f"[{datetime.now().strftime('%H:%M:%S')}] 新闻抓取失败: {e}")
def _extract_and_merge_panel_data(items: list) -> None:
"""AI 分析提取面板相关数据,清洗后落库"""
if not items or not os.path.exists(DB_PATH):
return
try:
from db_merge import merge
use_dashscope = bool(os.environ.get("DASHSCOPE_API_KEY", "").strip())
if use_dashscope:
from extractor_dashscope import extract_from_news
limit = 10
elif os.environ.get("CLEANER_AI_DISABLED", "0") == "1":
from extractor_rules import extract_from_news
limit = 25
else:
from extractor_ai import extract_from_news
limit = 10
from datetime import timezone
merged_any = False
for it in items[:limit]:
text = (it.get("title", "") or "") + " " + (it.get("summary", "") or "")
if len(text.strip()) < 20:
continue
pub = it.get("published")
ts = None
if pub:
try:
if isinstance(pub, str):
pub_dt = datetime.fromisoformat(pub.replace("Z", "+00:00"))
else:
pub_dt = pub
if pub_dt.tzinfo:
pub_dt = pub_dt.astimezone(timezone.utc)
ts = pub_dt.strftime("%Y-%m-%dT%H:%M:%S.000Z")
except Exception:
pass
extracted = extract_from_news(text, timestamp=ts)
if extracted:
if merge(extracted, db_path=DB_PATH):
merged_any = True
if merged_any:
_notify_node()
except Exception as e:
print(f" [warn] AI 面板数据提取/合并: {e}")
# ==========================
# 定时任务asyncio 后台任务,避免 APScheduler executor 关闭竞态)
# ==========================
_bg_task: Optional[asyncio.Task] = None
async def _periodic_fetch() -> None:
loop = asyncio.get_event_loop()
while True:
try:
await loop.run_in_executor(None, fetch_news)
await loop.run_in_executor(None, fetch_gdelt_events)
except asyncio.CancelledError:
break
except Exception as e:
print(f" [warn] 定时抓取: {e}")
await asyncio.sleep(min(RSS_INTERVAL_SEC, FETCH_INTERVAL_SEC))
# ==========================
# API 接口
# ==========================
@app.post("/crawler/backfill")
def crawler_backfill():
"""从 situation_update 重新解析并合并战损/报复等数据,用于修复历史数据未提取的情况"""
if not os.path.exists(DB_PATH):
return {"ok": False, "error": "db not found"}
try:
from db_merge import merge
if os.environ.get("CLEANER_AI_DISABLED", "0") == "1":
from extractor_rules import extract_from_news
else:
from extractor_ai import extract_from_news
conn = sqlite3.connect(DB_PATH, timeout=10)
rows = conn.execute(
"SELECT id, timestamp, category, summary FROM situation_update ORDER BY timestamp DESC LIMIT 50"
).fetchall()
conn.close()
merged = 0
for r in rows:
uid, ts, cat, summary = r
text = ((cat or "") + " " + (summary or "")).strip()
if len(text) < 20:
continue
try:
extracted = extract_from_news(text, timestamp=ts)
if extracted and merge(extracted, db_path=DB_PATH):
merged += 1
except Exception:
pass
_notify_node()
return {"ok": True, "processed": len(rows), "merged": merged}
except Exception as e:
return {"ok": False, "error": str(e)}
@app.get("/crawler/status")
def crawler_status():
"""爬虫状态:用于排查数据更新链路"""
import os
db_ok = os.path.exists(DB_PATH)
total = 0
if db_ok:
try:
conn = sqlite3.connect(DB_PATH, timeout=3)
total = conn.execute("SELECT COUNT(*) FROM situation_update").fetchone()[0]
conn.close()
except Exception:
pass
return {
"db_path": DB_PATH,
"db_exists": db_ok,
"situation_update_count": total,
"last_fetch_items": LAST_FETCH.get("items", 0),
"last_fetch_inserted": LAST_FETCH.get("inserted", 0),
"last_fetch_error": LAST_FETCH.get("error"),
}
@app.get("/events")
def get_events():
return {
"updated_at": datetime.utcnow().isoformat(),
"count": len(EVENT_CACHE),
"events": EVENT_CACHE,
"conflict_stats": _get_conflict_stats(),
}
def _get_conflict_stats() -> dict:
if not os.path.exists(DB_PATH):
return {"total_events": 0, "high_impact_events": 0, "estimated_casualties": 0, "estimated_strike_count": 0}
try:
conn = sqlite3.connect(DB_PATH, timeout=5)
row = conn.execute("SELECT total_events, high_impact_events, estimated_casualties, estimated_strike_count FROM conflict_stats WHERE id = 1").fetchone()
conn.close()
if row:
return {
"total_events": row[0],
"high_impact_events": row[1],
"estimated_casualties": row[2],
"estimated_strike_count": row[3],
}
except Exception:
pass
return {"total_events": 0, "high_impact_events": 0, "estimated_casualties": 0, "estimated_strike_count": 0}
@app.on_event("startup")
async def startup():
global _bg_task
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, fetch_news)
await loop.run_in_executor(None, fetch_gdelt_events)
_bg_task = asyncio.create_task(_periodic_fetch())
@app.on_event("shutdown")
async def shutdown():
global _bg_task
if _bg_task and not _bg_task.done():
_bg_task.cancel()
try:
await _bg_task
except asyncio.CancelledError:
pass
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)