Files
usa/crawler/realtime_conflict_service.py
2026-03-02 01:00:04 +08:00

287 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
GDELT 实时冲突抓取 + API 服务
核心数据源GDELT Project约 15 分钟级更新,含经纬度、事件编码、参与方、事件强度
"""
import os
# 直连外网,避免系统代理导致 ProxyError / 超时(需代理时设置 CRAWLER_USE_PROXY=1
if os.environ.get("CRAWLER_USE_PROXY") != "1":
os.environ.setdefault("NO_PROXY", "*")
import hashlib
import sqlite3
from datetime import datetime
from pathlib import Path
from typing import List, Optional
import requests
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from apscheduler.schedulers.background import BackgroundScheduler
app = FastAPI(title="GDELT Conflict Service")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"])
# 配置
PROJECT_ROOT = Path(__file__).resolve().parent.parent
DB_PATH = os.environ.get("DB_PATH", str(PROJECT_ROOT / "server" / "data.db"))
API_BASE = os.environ.get("API_BASE", "http://localhost:3001")
QUERY = os.environ.get("GDELT_QUERY", "United States Iran military")
MAX_RECORDS = int(os.environ.get("GDELT_MAX_RECORDS", "30"))
FETCH_INTERVAL_SEC = int(os.environ.get("FETCH_INTERVAL_SEC", "60"))
RSS_INTERVAL_SEC = int(os.environ.get("RSS_INTERVAL_SEC", "45")) # 新闻抓取更频繁,优先保证事件脉络
# 时间范围1h=1小时 1d=1天 1week=1周不设则默认 3 个月(易返回旧文)
GDELT_TIMESPAN = os.environ.get("GDELT_TIMESPAN", "1d")
# 设为 1 则跳过 GDELT仅用 RSS 新闻作为事件脉络GDELT 国外可能无法访问)
GDELT_DISABLED = os.environ.get("GDELT_DISABLED", "0") == "1"
# 伊朗攻击源(无经纬度时默认)
IRAN_COORD = [51.3890, 35.6892] # Tehran [lng, lat]
# 请求直连,不经过系统代理(避免 ProxyError / 代理超时)
_REQ_KW = {"timeout": 15, "headers": {"User-Agent": "US-Iran-Dashboard/1.0"}}
if os.environ.get("CRAWLER_USE_PROXY") != "1":
_REQ_KW["proxies"] = {"http": None, "https": None}
EVENT_CACHE: List[dict] = []
# ==========================
# 冲突强度评分 (110)
# ==========================
def calculate_impact_score(title: str) -> int:
score = 1
t = (title or "").lower()
if "missile" in t:
score += 3
if "strike" in t:
score += 2
if "killed" in t or "death" in t or "casualt" in t:
score += 4
if "troops" in t or "soldier" in t:
score += 2
if "attack" in t or "attacked" in t:
score += 3
if "nuclear" in t or "" in t:
score += 4
if "explosion" in t or "blast" in t or "bomb" in t:
score += 2
return min(score, 10)
# ==========================
# 获取 GDELT 实时事件
# ==========================
def _parse_article(article: dict) -> Optional[dict]:
title_raw = article.get("title") or article.get("seendate") or ""
if not title_raw:
return None
from translate_utils import translate_to_chinese
title = translate_to_chinese(str(title_raw)[:500])
url = article.get("url") or article.get("socialimage") or ""
seendate = article.get("seendate") or datetime.utcnow().isoformat()
lat = article.get("lat")
lng = article.get("lng")
# 无经纬度时使用伊朗坐标(攻击源)
if lat is None or lng is None:
lat, lng = IRAN_COORD[1], IRAN_COORD[0]
try:
lat, lng = float(lat), float(lng)
except (TypeError, ValueError):
lat, lng = IRAN_COORD[1], IRAN_COORD[0]
impact = calculate_impact_score(title_raw)
event_id = hashlib.sha256(f"{url}{seendate}".encode()).hexdigest()[:24]
return {
"event_id": event_id,
"event_time": seendate,
"title": title[:500],
"lat": lat,
"lng": lng,
"impact_score": impact,
"url": url,
}
def fetch_gdelt_events() -> None:
if GDELT_DISABLED:
return
url = (
"https://api.gdeltproject.org/api/v2/doc/doc"
f"?query={QUERY}"
"&mode=ArtList"
"&format=json"
f"&maxrecords={MAX_RECORDS}"
f"&timespan={GDELT_TIMESPAN}"
"&sort=datedesc"
)
try:
resp = requests.get(url, **_REQ_KW)
resp.raise_for_status()
data = resp.json()
articles = data.get("articles", data) if isinstance(data, dict) else (data if isinstance(data, list) else [])
if not isinstance(articles, list):
articles = []
new_events = []
for a in articles:
ev = _parse_article(a) if isinstance(a, dict) else None
if ev:
new_events.append(ev)
# 按 event_time 排序,最新在前
new_events.sort(key=lambda e: e.get("event_time", ""), reverse=True)
global EVENT_CACHE
EVENT_CACHE = new_events
# 写入 SQLite 并通知 Node
_write_to_db(new_events)
_notify_node()
print(f"[{datetime.now().strftime('%H:%M:%S')}] GDELT 更新 {len(new_events)} 条事件")
except Exception as e:
print(f"GDELT 抓取失败: {e}")
def _ensure_table(conn: sqlite3.Connection) -> None:
conn.execute("""
CREATE TABLE IF NOT EXISTS gdelt_events (
event_id TEXT PRIMARY KEY,
event_time TEXT NOT NULL,
title TEXT NOT NULL,
lat REAL NOT NULL,
lng REAL NOT NULL,
impact_score INTEGER NOT NULL,
url TEXT,
created_at TEXT DEFAULT (datetime('now'))
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS conflict_stats (
id INTEGER PRIMARY KEY CHECK (id = 1),
total_events INTEGER NOT NULL,
high_impact_events INTEGER NOT NULL,
estimated_casualties INTEGER NOT NULL,
estimated_strike_count INTEGER NOT NULL,
updated_at TEXT NOT NULL
)
""")
conn.commit()
def _write_to_db(events: List[dict]) -> None:
if not os.path.exists(DB_PATH):
return
conn = sqlite3.connect(DB_PATH, timeout=10)
try:
_ensure_table(conn)
for e in events:
conn.execute(
"INSERT OR REPLACE INTO gdelt_events (event_id, event_time, title, lat, lng, impact_score, url) VALUES (?, ?, ?, ?, ?, ?, ?)",
(
e["event_id"],
e.get("event_time", ""),
e.get("title", ""),
e.get("lat", 0),
e.get("lng", 0),
e.get("impact_score", 1),
e.get("url", ""),
),
)
# 战损统计模型(展示用)
high = sum(1 for x in events if x.get("impact_score", 0) >= 7)
strikes = sum(1 for x in events if "strike" in (x.get("title") or "").lower() or "attack" in (x.get("title") or "").lower())
casualties = min(5000, high * 80 + len(events) * 10) # 估算
conn.execute(
"INSERT OR REPLACE INTO conflict_stats (id, total_events, high_impact_events, estimated_casualties, estimated_strike_count, updated_at) VALUES (1, ?, ?, ?, ?, ?)",
(len(events), high, casualties, strikes, datetime.utcnow().isoformat()),
)
conn.execute(
"INSERT OR REPLACE INTO situation (id, data, updated_at) VALUES (1, '{}', ?)",
(datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.000Z"),),
)
conn.commit()
except Exception as e:
print(f"写入 DB 失败: {e}")
conn.rollback()
finally:
conn.close()
def _notify_node() -> None:
try:
r = requests.post(f"{API_BASE}/api/crawler/notify", timeout=5, proxies={"http": None, "https": None})
if r.status_code != 200:
print(" [warn] notify API 失败")
except Exception as e:
print(f" [warn] notify API: {e}")
# ==========================
# RSS 新闻抓取(补充 situation_update
# ==========================
def fetch_news() -> None:
try:
from scrapers.rss_scraper import fetch_all
from db_writer import write_updates
from translate_utils import translate_to_chinese
items = fetch_all()
for it in items:
it["title"] = translate_to_chinese(it.get("title", "") or "")
it["summary"] = translate_to_chinese(it.get("summary", "") or it.get("title", ""))
if items:
n = write_updates(items)
if n > 0:
_notify_node()
print(f"[{datetime.now().strftime('%H:%M:%S')}] 新闻入库 {n}")
except Exception as e:
print(f"新闻抓取失败: {e}")
# ==========================
# 定时任务RSS 更频繁,优先保证事件脉络实时)
# ==========================
scheduler = BackgroundScheduler()
scheduler.add_job(fetch_news, "interval", seconds=RSS_INTERVAL_SEC)
scheduler.add_job(fetch_gdelt_events, "interval", seconds=FETCH_INTERVAL_SEC)
scheduler.start()
# ==========================
# API 接口
# ==========================
@app.get("/events")
def get_events():
return {
"updated_at": datetime.utcnow().isoformat(),
"count": len(EVENT_CACHE),
"events": EVENT_CACHE,
"conflict_stats": _get_conflict_stats(),
}
def _get_conflict_stats() -> dict:
if not os.path.exists(DB_PATH):
return {"total_events": 0, "high_impact_events": 0, "estimated_casualties": 0, "estimated_strike_count": 0}
try:
conn = sqlite3.connect(DB_PATH, timeout=5)
row = conn.execute("SELECT total_events, high_impact_events, estimated_casualties, estimated_strike_count FROM conflict_stats WHERE id = 1").fetchone()
conn.close()
if row:
return {
"total_events": row[0],
"high_impact_events": row[1],
"estimated_casualties": row[2],
"estimated_strike_count": row[3],
}
except Exception:
pass
return {"total_events": 0, "high_impact_events": 0, "estimated_casualties": 0, "estimated_strike_count": 0}
@app.on_event("startup")
def startup():
# 新闻优先启动,确保事件脉络有数据
fetch_news()
fetch_gdelt_events()
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)