504 lines
18 KiB
Python
504 lines
18 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
GDELT 实时冲突抓取 + API 服务
|
||
核心数据源:GDELT Project,约 15 分钟级更新,含经纬度、事件编码、参与方、事件强度
|
||
"""
|
||
import os
|
||
# 直连外网,避免系统代理导致 ProxyError / 超时(需代理时设置 CRAWLER_USE_PROXY=1)
|
||
if os.environ.get("CRAWLER_USE_PROXY") != "1":
|
||
os.environ.setdefault("NO_PROXY", "*")
|
||
|
||
import hashlib
|
||
import sqlite3
|
||
from datetime import datetime
|
||
from pathlib import Path
|
||
from typing import List, Optional
|
||
|
||
import asyncio
|
||
import logging
|
||
import requests
|
||
from fastapi import FastAPI
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
|
||
logging.getLogger("uvicorn").setLevel(logging.INFO)
|
||
app = FastAPI(title="GDELT Conflict Service")
|
||
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"])
|
||
|
||
# 配置
|
||
PROJECT_ROOT = Path(__file__).resolve().parent.parent
|
||
DB_PATH = os.environ.get("DB_PATH", str(PROJECT_ROOT / "server" / "data.db"))
|
||
API_BASE = os.environ.get("API_BASE", "http://localhost:3001")
|
||
QUERY = os.environ.get("GDELT_QUERY", "United States Iran military")
|
||
MAX_RECORDS = int(os.environ.get("GDELT_MAX_RECORDS", "30"))
|
||
FETCH_INTERVAL_SEC = int(os.environ.get("FETCH_INTERVAL_SEC", "60"))
|
||
RSS_INTERVAL_SEC = int(os.environ.get("RSS_INTERVAL_SEC", "60")) # 每分钟抓取世界主流媒体
|
||
# 时间范围:1h=1小时 1d=1天 1week=1周;不设则默认 3 个月(易返回旧文)
|
||
GDELT_TIMESPAN = os.environ.get("GDELT_TIMESPAN", "1d")
|
||
# 设为 1 则跳过 GDELT,仅用 RSS 新闻作为事件脉络(GDELT 国外可能无法访问)
|
||
GDELT_DISABLED = os.environ.get("GDELT_DISABLED", "0") == "1"
|
||
|
||
# 伊朗攻击源(无经纬度时默认)
|
||
IRAN_COORD = [51.3890, 35.6892] # Tehran [lng, lat]
|
||
|
||
# 请求直连,不经过系统代理(避免 ProxyError / 代理超时)
|
||
_REQ_KW = {"timeout": 15, "headers": {"User-Agent": "US-Iran-Dashboard/1.0"}}
|
||
if os.environ.get("CRAWLER_USE_PROXY") != "1":
|
||
_REQ_KW["proxies"] = {"http": None, "https": None}
|
||
|
||
EVENT_CACHE: List[dict] = []
|
||
|
||
|
||
# ==========================
|
||
# 冲突强度评分 (1–10)
|
||
# ==========================
|
||
def calculate_impact_score(title: str) -> int:
|
||
score = 1
|
||
t = (title or "").lower()
|
||
if "missile" in t or "导弹" in t:
|
||
score += 3
|
||
if "strike" in t or "袭击" in t or "打击" in t:
|
||
score += 2
|
||
if "killed" in t or "death" in t or "casualt" in t or "死亡" in t or "伤亡" in t:
|
||
score += 4
|
||
if "troops" in t or "soldier" in t or "士兵" in t or "军人" in t:
|
||
score += 2
|
||
if "attack" in t or "attacked" in t or "攻击" in t:
|
||
score += 3
|
||
if "nuclear" in t or "核" in t:
|
||
score += 4
|
||
if "explosion" in t or "blast" in t or "bomb" in t or "爆炸" in t:
|
||
score += 2
|
||
return min(score, 10)
|
||
|
||
|
||
# 根据 severity 映射到 impact_score
|
||
def _severity_to_score(sev: str) -> int:
|
||
m = {"critical": 9, "high": 7, "medium": 5, "low": 2}
|
||
return m.get((sev or "").lower(), 5)
|
||
|
||
|
||
# 根据文本推断坐标 [lng, lat],用于 GDELT 禁用时 RSS→gdelt_events
|
||
_LOC_COORDS = [
|
||
(["阿克罗蒂里", "akrotiri", "塞浦路斯", "cyprus"], (32.98, 34.58)),
|
||
(["巴格拉姆", "bagram", "阿富汗", "afghanistan"], (69.26, 34.95)),
|
||
(["巴格达", "baghdad", "伊拉克", "iraq"], (44.37, 33.31)),
|
||
(["贝鲁特", "beirut", "黎巴嫩", "lebanon"], (35.49, 33.89)),
|
||
(["耶路撒冷", "jerusalem", "特拉维夫", "tel aviv", "以色列", "israel"], (35.21, 31.77)),
|
||
(["阿巴斯港", "bandar abbas", "霍尔木兹", "hormuz"], (56.27, 27.18)),
|
||
(["米纳布", "minab"], (57.08, 27.13)),
|
||
(["德黑兰", "tehran", "伊朗", "iran"], (51.389, 35.689)),
|
||
(["大马士革", "damascus", "叙利亚", "syria"], (36.28, 33.50)),
|
||
(["迪拜", "dubai", "阿联酋", "uae"], (55.27, 25.20)),
|
||
(["沙特", "saudi"], (46.73, 24.71)),
|
||
(["巴基斯坦", "pakistan"], (73.06, 33.72)),
|
||
(["奥斯汀", "austin"], (-97.74, 30.27)),
|
||
]
|
||
|
||
|
||
def _infer_coords(text: str) -> tuple:
|
||
t = (text or "").lower()
|
||
for kws, (lng, lat) in _LOC_COORDS:
|
||
for k in kws:
|
||
if k in t:
|
||
return (lng, lat)
|
||
return (IRAN_COORD[0], IRAN_COORD[1])
|
||
|
||
|
||
# ==========================
|
||
# 获取 GDELT 实时事件
|
||
# ==========================
|
||
def _parse_article(article: dict) -> Optional[dict]:
|
||
title_raw = article.get("title") or article.get("seendate") or ""
|
||
if not title_raw:
|
||
return None
|
||
from translate_utils import translate_to_chinese
|
||
from cleaner_ai import clean_news_for_panel
|
||
title = translate_to_chinese(str(title_raw)[:500])
|
||
title = clean_news_for_panel(title, max_len=150)
|
||
url = article.get("url") or article.get("socialimage") or ""
|
||
seendate = article.get("seendate") or datetime.utcnow().isoformat()
|
||
lat = article.get("lat")
|
||
lng = article.get("lng")
|
||
# 无经纬度时使用伊朗坐标(攻击源)
|
||
if lat is None or lng is None:
|
||
lat, lng = IRAN_COORD[1], IRAN_COORD[0]
|
||
try:
|
||
lat, lng = float(lat), float(lng)
|
||
except (TypeError, ValueError):
|
||
lat, lng = IRAN_COORD[1], IRAN_COORD[0]
|
||
impact = calculate_impact_score(title_raw)
|
||
event_id = hashlib.sha256(f"{url}{seendate}".encode()).hexdigest()[:24]
|
||
return {
|
||
"event_id": event_id,
|
||
"event_time": seendate,
|
||
"title": title[:500],
|
||
"lat": lat,
|
||
"lng": lng,
|
||
"impact_score": impact,
|
||
"url": url,
|
||
}
|
||
|
||
|
||
def fetch_gdelt_events() -> None:
|
||
if GDELT_DISABLED:
|
||
return
|
||
url = (
|
||
"https://api.gdeltproject.org/api/v2/doc/doc"
|
||
f"?query={QUERY}"
|
||
"&mode=ArtList"
|
||
"&format=json"
|
||
f"&maxrecords={MAX_RECORDS}"
|
||
f"×pan={GDELT_TIMESPAN}"
|
||
"&sort=datedesc"
|
||
)
|
||
try:
|
||
resp = requests.get(url, **_REQ_KW)
|
||
resp.raise_for_status()
|
||
data = resp.json()
|
||
articles = data.get("articles", data) if isinstance(data, dict) else (data if isinstance(data, list) else [])
|
||
if not isinstance(articles, list):
|
||
articles = []
|
||
new_events = []
|
||
for a in articles:
|
||
ev = _parse_article(a) if isinstance(a, dict) else None
|
||
if ev:
|
||
new_events.append(ev)
|
||
# 按 event_time 排序,最新在前
|
||
new_events.sort(key=lambda e: e.get("event_time", ""), reverse=True)
|
||
global EVENT_CACHE
|
||
EVENT_CACHE = new_events
|
||
# 写入 SQLite 并通知 Node
|
||
_write_to_db(new_events)
|
||
_notify_node()
|
||
print(f"[{datetime.now().strftime('%H:%M:%S')}] GDELT 更新 {len(new_events)} 条事件")
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
def _ensure_table(conn: sqlite3.Connection) -> None:
|
||
conn.execute("""
|
||
CREATE TABLE IF NOT EXISTS gdelt_events (
|
||
event_id TEXT PRIMARY KEY,
|
||
event_time TEXT NOT NULL,
|
||
title TEXT NOT NULL,
|
||
lat REAL NOT NULL,
|
||
lng REAL NOT NULL,
|
||
impact_score INTEGER NOT NULL,
|
||
url TEXT,
|
||
created_at TEXT DEFAULT (datetime('now'))
|
||
)
|
||
""")
|
||
conn.execute("""
|
||
CREATE TABLE IF NOT EXISTS conflict_stats (
|
||
id INTEGER PRIMARY KEY CHECK (id = 1),
|
||
total_events INTEGER NOT NULL,
|
||
high_impact_events INTEGER NOT NULL,
|
||
estimated_casualties INTEGER NOT NULL,
|
||
estimated_strike_count INTEGER NOT NULL,
|
||
updated_at TEXT NOT NULL
|
||
)
|
||
""")
|
||
conn.commit()
|
||
|
||
|
||
def _write_to_db(events: List[dict]) -> None:
|
||
if not os.path.exists(DB_PATH):
|
||
return
|
||
conn = sqlite3.connect(DB_PATH, timeout=10)
|
||
try:
|
||
_ensure_table(conn)
|
||
for e in events:
|
||
conn.execute(
|
||
"INSERT OR REPLACE INTO gdelt_events (event_id, event_time, title, lat, lng, impact_score, url) VALUES (?, ?, ?, ?, ?, ?, ?)",
|
||
(
|
||
e["event_id"],
|
||
e.get("event_time", ""),
|
||
e.get("title", ""),
|
||
e.get("lat", 0),
|
||
e.get("lng", 0),
|
||
e.get("impact_score", 1),
|
||
e.get("url", ""),
|
||
),
|
||
)
|
||
# 战损统计模型(展示用)
|
||
high = sum(1 for x in events if x.get("impact_score", 0) >= 7)
|
||
strikes = sum(1 for x in events if "strike" in (x.get("title") or "").lower() or "attack" in (x.get("title") or "").lower())
|
||
casualties = min(5000, high * 80 + len(events) * 10) # 估算
|
||
conn.execute(
|
||
"INSERT OR REPLACE INTO conflict_stats (id, total_events, high_impact_events, estimated_casualties, estimated_strike_count, updated_at) VALUES (1, ?, ?, ?, ?, ?)",
|
||
(len(events), high, casualties, strikes, datetime.utcnow().isoformat()),
|
||
)
|
||
conn.execute(
|
||
"INSERT OR REPLACE INTO situation (id, data, updated_at) VALUES (1, '{}', ?)",
|
||
(datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.000Z"),),
|
||
)
|
||
conn.commit()
|
||
except Exception as e:
|
||
print(f"写入 DB 失败: {e}")
|
||
conn.rollback()
|
||
finally:
|
||
conn.close()
|
||
|
||
|
||
def _notify_node() -> None:
|
||
try:
|
||
r = requests.post(f"{API_BASE}/api/crawler/notify", timeout=5, proxies={"http": None, "https": None})
|
||
if r.status_code != 200:
|
||
print(" [warn] notify API 失败")
|
||
except Exception as e:
|
||
print(f" [warn] notify API: {e}")
|
||
|
||
|
||
def _rss_to_gdelt_fallback() -> None:
|
||
"""GDELT 禁用时,将 situation_update 同步到 gdelt_events,使地图有冲突点"""
|
||
if not GDELT_DISABLED or not os.path.exists(DB_PATH):
|
||
return
|
||
try:
|
||
conn = sqlite3.connect(DB_PATH, timeout=10)
|
||
rows = conn.execute(
|
||
"SELECT id, timestamp, category, summary, severity FROM situation_update ORDER BY timestamp DESC LIMIT 50"
|
||
).fetchall()
|
||
conn.close()
|
||
events = []
|
||
for r in rows:
|
||
uid, ts, cat, summary, sev = r
|
||
lng, lat = _infer_coords((summary or "")[:300])
|
||
impact = _severity_to_score(sev)
|
||
events.append({
|
||
"event_id": f"rss_{uid}",
|
||
"event_time": ts,
|
||
"title": (summary or "")[:500],
|
||
"lat": lat,
|
||
"lng": lng,
|
||
"impact_score": impact,
|
||
"url": "",
|
||
})
|
||
if events:
|
||
global EVENT_CACHE
|
||
EVENT_CACHE = events
|
||
_write_to_db(events)
|
||
_notify_node()
|
||
except Exception as e:
|
||
print(f" [warn] RSS→gdelt fallback: {e}")
|
||
|
||
|
||
# ==========================
|
||
# RSS 新闻抓取:资讯落库(去重) → AI 提取 → 面板数据落库 → 通知前端
|
||
# ==========================
|
||
LAST_FETCH = {"items": 0, "inserted": 0, "error": None}
|
||
|
||
|
||
def fetch_news() -> None:
|
||
try:
|
||
from scrapers.rss_scraper import fetch_all
|
||
from db_writer import write_updates
|
||
from news_storage import save_and_dedup
|
||
from translate_utils import translate_to_chinese
|
||
from cleaner_ai import clean_news_for_panel
|
||
from cleaner_ai import ensure_category, ensure_severity
|
||
LAST_FETCH["error"] = None
|
||
items = fetch_all()
|
||
for it in items:
|
||
raw_title = translate_to_chinese(it.get("title", "") or "")
|
||
raw_summary = translate_to_chinese(it.get("summary", "") or it.get("title", ""))
|
||
it["title"] = clean_news_for_panel(raw_title, max_len=80)
|
||
it["summary"] = clean_news_for_panel(raw_summary or raw_title, max_len=120)
|
||
it["category"] = ensure_category(it.get("category", "other"))
|
||
it["severity"] = ensure_severity(it.get("severity", "medium"))
|
||
it["source"] = it.get("source") or "rss"
|
||
# 1. 历史去重:资讯内容落库 news_content(独立表,便于后续消费)
|
||
new_items, n_news = save_and_dedup(items, db_path=DB_PATH)
|
||
# 2. 面板展示:新增资讯写入 situation_update(供前端 recentUpdates)
|
||
n_panel = write_updates(new_items) if new_items else 0
|
||
LAST_FETCH["items"] = len(items)
|
||
LAST_FETCH["inserted"] = n_news
|
||
# 3. AI 提取 + 合并到 combat_losses / key_location 等
|
||
if new_items:
|
||
_extract_and_merge_panel_data(new_items)
|
||
# GDELT 禁用时用 RSS 填充 gdelt_events,使地图有冲突点
|
||
if GDELT_DISABLED:
|
||
_rss_to_gdelt_fallback()
|
||
_notify_node()
|
||
print(f"[{datetime.now().strftime('%H:%M:%S')}] RSS 抓取 {len(items)} 条,去重后新增 {n_news} 条资讯,面板 {n_panel} 条")
|
||
except Exception as e:
|
||
LAST_FETCH["error"] = str(e)
|
||
print(f"[{datetime.now().strftime('%H:%M:%S')}] 新闻抓取失败: {e}")
|
||
|
||
|
||
def _extract_and_merge_panel_data(items: list) -> None:
|
||
"""AI 分析提取面板相关数据,清洗后落库"""
|
||
if not items or not os.path.exists(DB_PATH):
|
||
return
|
||
try:
|
||
from db_merge import merge
|
||
use_dashscope = bool(os.environ.get("DASHSCOPE_API_KEY", "").strip())
|
||
if use_dashscope:
|
||
from extractor_dashscope import extract_from_news
|
||
limit = 10
|
||
elif os.environ.get("CLEANER_AI_DISABLED", "0") == "1":
|
||
from extractor_rules import extract_from_news
|
||
limit = 25
|
||
else:
|
||
from extractor_ai import extract_from_news
|
||
limit = 10
|
||
from datetime import timezone
|
||
merged_any = False
|
||
for it in items[:limit]:
|
||
text = (it.get("title", "") or "") + " " + (it.get("summary", "") or "")
|
||
if len(text.strip()) < 20:
|
||
continue
|
||
pub = it.get("published")
|
||
ts = None
|
||
if pub:
|
||
try:
|
||
if isinstance(pub, str):
|
||
pub_dt = datetime.fromisoformat(pub.replace("Z", "+00:00"))
|
||
else:
|
||
pub_dt = pub
|
||
if pub_dt.tzinfo:
|
||
pub_dt = pub_dt.astimezone(timezone.utc)
|
||
ts = pub_dt.strftime("%Y-%m-%dT%H:%M:%S.000Z")
|
||
except Exception:
|
||
pass
|
||
extracted = extract_from_news(text, timestamp=ts)
|
||
if extracted:
|
||
if merge(extracted, db_path=DB_PATH):
|
||
merged_any = True
|
||
if merged_any:
|
||
_notify_node()
|
||
except Exception as e:
|
||
print(f" [warn] AI 面板数据提取/合并: {e}")
|
||
|
||
|
||
# ==========================
|
||
# 定时任务(asyncio 后台任务,避免 APScheduler executor 关闭竞态)
|
||
# ==========================
|
||
_bg_task: Optional[asyncio.Task] = None
|
||
|
||
|
||
async def _periodic_fetch() -> None:
|
||
loop = asyncio.get_event_loop()
|
||
while True:
|
||
try:
|
||
await loop.run_in_executor(None, fetch_news)
|
||
await loop.run_in_executor(None, fetch_gdelt_events)
|
||
except asyncio.CancelledError:
|
||
break
|
||
except Exception as e:
|
||
print(f" [warn] 定时抓取: {e}")
|
||
await asyncio.sleep(min(RSS_INTERVAL_SEC, FETCH_INTERVAL_SEC))
|
||
|
||
|
||
# ==========================
|
||
# API 接口
|
||
# ==========================
|
||
@app.post("/crawler/backfill")
|
||
def crawler_backfill():
|
||
"""从 situation_update 重新解析并合并战损/报复等数据,用于修复历史数据未提取的情况"""
|
||
if not os.path.exists(DB_PATH):
|
||
return {"ok": False, "error": "db not found"}
|
||
try:
|
||
from db_merge import merge
|
||
if os.environ.get("CLEANER_AI_DISABLED", "0") == "1":
|
||
from extractor_rules import extract_from_news
|
||
else:
|
||
from extractor_ai import extract_from_news
|
||
conn = sqlite3.connect(DB_PATH, timeout=10)
|
||
rows = conn.execute(
|
||
"SELECT id, timestamp, category, summary FROM situation_update ORDER BY timestamp DESC LIMIT 50"
|
||
).fetchall()
|
||
conn.close()
|
||
merged = 0
|
||
for r in rows:
|
||
uid, ts, cat, summary = r
|
||
text = ((cat or "") + " " + (summary or "")).strip()
|
||
if len(text) < 20:
|
||
continue
|
||
try:
|
||
extracted = extract_from_news(text, timestamp=ts)
|
||
if extracted and merge(extracted, db_path=DB_PATH):
|
||
merged += 1
|
||
except Exception:
|
||
pass
|
||
_notify_node()
|
||
return {"ok": True, "processed": len(rows), "merged": merged}
|
||
except Exception as e:
|
||
return {"ok": False, "error": str(e)}
|
||
|
||
|
||
@app.get("/crawler/status")
|
||
def crawler_status():
|
||
"""爬虫状态:用于排查数据更新链路"""
|
||
import os
|
||
db_ok = os.path.exists(DB_PATH)
|
||
total = 0
|
||
if db_ok:
|
||
try:
|
||
conn = sqlite3.connect(DB_PATH, timeout=3)
|
||
total = conn.execute("SELECT COUNT(*) FROM situation_update").fetchone()[0]
|
||
conn.close()
|
||
except Exception:
|
||
pass
|
||
return {
|
||
"db_path": DB_PATH,
|
||
"db_exists": db_ok,
|
||
"situation_update_count": total,
|
||
"last_fetch_items": LAST_FETCH.get("items", 0),
|
||
"last_fetch_inserted": LAST_FETCH.get("inserted", 0),
|
||
"last_fetch_error": LAST_FETCH.get("error"),
|
||
}
|
||
|
||
|
||
@app.get("/events")
|
||
def get_events():
|
||
return {
|
||
"updated_at": datetime.utcnow().isoformat(),
|
||
"count": len(EVENT_CACHE),
|
||
"events": EVENT_CACHE,
|
||
"conflict_stats": _get_conflict_stats(),
|
||
}
|
||
|
||
|
||
def _get_conflict_stats() -> dict:
|
||
if not os.path.exists(DB_PATH):
|
||
return {"total_events": 0, "high_impact_events": 0, "estimated_casualties": 0, "estimated_strike_count": 0}
|
||
try:
|
||
conn = sqlite3.connect(DB_PATH, timeout=5)
|
||
row = conn.execute("SELECT total_events, high_impact_events, estimated_casualties, estimated_strike_count FROM conflict_stats WHERE id = 1").fetchone()
|
||
conn.close()
|
||
if row:
|
||
return {
|
||
"total_events": row[0],
|
||
"high_impact_events": row[1],
|
||
"estimated_casualties": row[2],
|
||
"estimated_strike_count": row[3],
|
||
}
|
||
except Exception:
|
||
pass
|
||
return {"total_events": 0, "high_impact_events": 0, "estimated_casualties": 0, "estimated_strike_count": 0}
|
||
|
||
|
||
@app.on_event("startup")
|
||
async def startup():
|
||
global _bg_task
|
||
loop = asyncio.get_event_loop()
|
||
await loop.run_in_executor(None, fetch_news)
|
||
await loop.run_in_executor(None, fetch_gdelt_events)
|
||
_bg_task = asyncio.create_task(_periodic_fetch())
|
||
|
||
|
||
@app.on_event("shutdown")
|
||
async def shutdown():
|
||
global _bg_task
|
||
if _bg_task and not _bg_task.done():
|
||
_bg_task.cancel()
|
||
try:
|
||
await _bg_task
|
||
except asyncio.CancelledError:
|
||
pass
|
||
|
||
|
||
if __name__ == "__main__":
|
||
import uvicorn
|
||
uvicorn.run(app, host="0.0.0.0", port=8000)
|