fix: 优化数据

This commit is contained in:
Daniel
2026-03-02 11:28:13 +08:00
parent 4a8fff5a00
commit 004d10b283
39 changed files with 1106 additions and 56 deletions

View File

@@ -14,11 +14,13 @@ from datetime import datetime
from pathlib import Path
from typing import List, Optional
import logging
import requests
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from apscheduler.schedulers.background import BackgroundScheduler
logging.getLogger("apscheduler.scheduler").setLevel(logging.ERROR)
app = FastAPI(title="GDELT Conflict Service")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"])
@@ -29,7 +31,7 @@ API_BASE = os.environ.get("API_BASE", "http://localhost:3001")
QUERY = os.environ.get("GDELT_QUERY", "United States Iran military")
MAX_RECORDS = int(os.environ.get("GDELT_MAX_RECORDS", "30"))
FETCH_INTERVAL_SEC = int(os.environ.get("FETCH_INTERVAL_SEC", "60"))
RSS_INTERVAL_SEC = int(os.environ.get("RSS_INTERVAL_SEC", "45")) # 新闻抓取更频繁,优先保证事件脉络
RSS_INTERVAL_SEC = int(os.environ.get("RSS_INTERVAL_SEC", "60")) # 每分钟抓取世界主流媒体
# 时间范围1h=1小时 1d=1天 1week=1周不设则默认 3 个月(易返回旧文)
GDELT_TIMESPAN = os.environ.get("GDELT_TIMESPAN", "1d")
# 设为 1 则跳过 GDELT仅用 RSS 新闻作为事件脉络GDELT 国外可能无法访问)
@@ -77,7 +79,9 @@ def _parse_article(article: dict) -> Optional[dict]:
if not title_raw:
return None
from translate_utils import translate_to_chinese
from cleaner_ai import clean_news_for_panel
title = translate_to_chinese(str(title_raw)[:500])
title = clean_news_for_panel(title, max_len=150)
url = article.get("url") or article.get("socialimage") or ""
seendate = article.get("seendate") or datetime.utcnow().isoformat()
lat = article.get("lat")
@@ -134,8 +138,8 @@ def fetch_gdelt_events() -> None:
_write_to_db(new_events)
_notify_node()
print(f"[{datetime.now().strftime('%H:%M:%S')}] GDELT 更新 {len(new_events)} 条事件")
except Exception as e:
print(f"GDELT 抓取失败: {e}")
except Exception:
pass
def _ensure_table(conn: sqlite3.Connection) -> None:
@@ -213,38 +217,115 @@ def _notify_node() -> None:
# ==========================
# RSS 新闻抓取(补充 situation_update
# RSS 新闻抓取(补充 situation_update + AI 提取面板数据
# ==========================
LAST_FETCH = {"items": 0, "inserted": 0, "error": None}
def fetch_news() -> None:
try:
from scrapers.rss_scraper import fetch_all
from db_writer import write_updates
from translate_utils import translate_to_chinese
from cleaner_ai import clean_news_for_panel
from cleaner_ai import ensure_category, ensure_severity
LAST_FETCH["error"] = None
items = fetch_all()
for it in items:
it["title"] = translate_to_chinese(it.get("title", "") or "")
it["summary"] = translate_to_chinese(it.get("summary", "") or it.get("title", ""))
raw_title = translate_to_chinese(it.get("title", "") or "")
raw_summary = translate_to_chinese(it.get("summary", "") or it.get("title", ""))
it["title"] = clean_news_for_panel(raw_title, max_len=80)
it["summary"] = clean_news_for_panel(raw_summary or raw_title, max_len=120)
it["category"] = ensure_category(it.get("category", "other"))
it["severity"] = ensure_severity(it.get("severity", "medium"))
n = write_updates(items) if items else 0
LAST_FETCH["items"] = len(items)
LAST_FETCH["inserted"] = n
if items:
n = write_updates(items)
_extract_and_merge_panel_data(items)
if n > 0:
_notify_node()
print(f"[{datetime.now().strftime('%H:%M:%S')}] 新闻入库 {n}")
print(f"[{datetime.now().strftime('%H:%M:%S')}] RSS 抓取 {len(items)} 条,新增入库 {n}")
except Exception as e:
print(f"新闻抓取失败: {e}")
LAST_FETCH["error"] = str(e)
print(f"[{datetime.now().strftime('%H:%M:%S')}] 新闻抓取失败: {e}")
def _extract_and_merge_panel_data(items: list) -> None:
"""对新闻做 AI/规则 提取,合并到 combat_losses / retaliation / wall_street_trend 等表"""
if not items or not os.path.exists(DB_PATH):
return
try:
from db_merge import merge
if os.environ.get("CLEANER_AI_DISABLED", "0") == "1":
from extractor_rules import extract_from_news
else:
from extractor_ai import extract_from_news
from datetime import timezone
merged_any = False
# 只对前几条有足够文本的新闻做提取,避免 Ollama 调用过多
for it in items[:5]:
text = (it.get("title", "") or "") + " " + (it.get("summary", "") or "")
if len(text.strip()) < 20:
continue
pub = it.get("published")
ts = None
if pub:
try:
if isinstance(pub, str):
pub_dt = datetime.fromisoformat(pub.replace("Z", "+00:00"))
else:
pub_dt = pub
if pub_dt.tzinfo:
pub_dt = pub_dt.astimezone(timezone.utc)
ts = pub_dt.strftime("%Y-%m-%dT%H:%M:%S.000Z")
except Exception:
pass
extracted = extract_from_news(text, timestamp=ts)
if extracted:
if merge(extracted, db_path=DB_PATH):
merged_any = True
if merged_any:
_notify_node()
except Exception as e:
print(f" [warn] AI 面板数据提取/合并: {e}")
# ==========================
# 定时任务RSS 更频繁,优先保证事件脉络实时)
# ==========================
scheduler = BackgroundScheduler()
scheduler.add_job(fetch_news, "interval", seconds=RSS_INTERVAL_SEC)
scheduler.add_job(fetch_gdelt_events, "interval", seconds=FETCH_INTERVAL_SEC)
scheduler.add_job(fetch_news, "interval", seconds=RSS_INTERVAL_SEC, max_instances=2, coalesce=True)
scheduler.add_job(fetch_gdelt_events, "interval", seconds=FETCH_INTERVAL_SEC, max_instances=2, coalesce=True)
scheduler.start()
# ==========================
# API 接口
# ==========================
@app.get("/crawler/status")
def crawler_status():
"""爬虫状态:用于排查数据更新链路"""
import os
db_ok = os.path.exists(DB_PATH)
total = 0
if db_ok:
try:
conn = sqlite3.connect(DB_PATH, timeout=3)
total = conn.execute("SELECT COUNT(*) FROM situation_update").fetchone()[0]
conn.close()
except Exception:
pass
return {
"db_path": DB_PATH,
"db_exists": db_ok,
"situation_update_count": total,
"last_fetch_items": LAST_FETCH.get("items", 0),
"last_fetch_inserted": LAST_FETCH.get("inserted", 0),
"last_fetch_error": LAST_FETCH.get("error"),
}
@app.get("/events")
def get_events():
return {