fix: 修复爬虫问题

This commit is contained in:
Daniel
2026-03-02 17:20:31 +08:00
parent 33e4786cd0
commit 0027074b8b
21 changed files with 523 additions and 16 deletions

Binary file not shown.

Binary file not shown.

View File

@@ -10,6 +10,9 @@ DB_PATH = os.environ.get("DB_PATH", str(PROJECT_ROOT / "server" / "data.db"))
# Node API 地址(用于通知推送)
API_BASE = os.environ.get("API_BASE", "http://localhost:3001")
# 阿里云 DashScope API Key用于 AI 提取面板数据,不设则回退到规则/Ollama
DASHSCOPE_API_KEY = os.environ.get("DASHSCOPE_API_KEY", "")
# 抓取间隔(秒)
CRAWL_INTERVAL = int(os.environ.get("CRAWL_INTERVAL", "300"))

View File

@@ -0,0 +1,121 @@
# -*- coding: utf-8 -*-
"""
阿里云 DashScope通义千问提取面板结构化数据
从新闻文本中提取战损、报复指数、基地状态等,供 db_merge 落库
API Key 通过环境变量 DASHSCOPE_API_KEY 配置
"""
import json
import os
import re
from datetime import datetime, timezone
from typing import Any, Dict, Optional
from panel_schema import validate_category, validate_severity, validate_summary
def _call_dashscope_extract(text: str, timeout: int = 15) -> Optional[Dict[str, Any]]:
"""调用阿里云 DashScope 提取结构化数据"""
api_key = os.environ.get("DASHSCOPE_API_KEY", "").strip()
if not api_key or not text or len(str(text).strip()) < 10:
return None
try:
import dashscope
from http import HTTPStatus
dashscope.api_key = api_key
prompt = f"""从以下美伊/中东军事新闻中提取可明确推断的数值,输出 JSON。无依据的字段省略不写。
要求:
- summary: 1-2句中文事实摘要≤80字
- category: deployment|alert|intel|diplomatic|other
- severity: low|medium|high|critical
- 战损(仅当新闻明确提及数字时填写):
us_personnel_killed, iran_personnel_killed, us_personnel_wounded, iran_personnel_wounded,
us_civilian_killed, iran_civilian_killed, us_civilian_wounded, iran_civilian_wounded,
us_bases_destroyed, iran_bases_destroyed, us_bases_damaged, iran_bases_damaged,
us_aircraft, iran_aircraft, us_warships, iran_warships, us_armor, iran_armor, us_vehicles, iran_vehicles
- retaliation_sentiment: 0-100仅当新闻涉及伊朗报复/反击情绪时
- wall_street_value: 0-100仅当新闻涉及美股/市场反应时
- key_location_updates: 当新闻提及具体基地遭袭时,数组 [{{"name_keywords":"阿萨德|asad|assad","side":"us","status":"attacked","damage_level":1-3}}]
原文:
{str(text)[:800]}
直接输出 JSON不要其他解释"""
response = dashscope.Generation.call(
model="qwen-turbo",
messages=[{"role": "user", "content": prompt}],
result_format="message",
max_tokens=512,
)
if response.status_code != HTTPStatus.OK:
return None
raw = (response.output.get("choices", [{}])[0].get("message", {}).get("content", "") or "").strip()
raw = re.sub(r"^```\w*\s*|\s*```$", "", raw)
return json.loads(raw)
except Exception:
return None
def extract_from_news(text: str, timestamp: Optional[str] = None) -> Dict[str, Any]:
"""
从新闻文本提取结构化数据,符合面板 schema
返回: { situation_update?, combat_losses_delta?, retaliation?, wall_street?, key_location_updates? }
"""
ts = timestamp or datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.000Z")
out: Dict[str, Any] = {}
parsed = _call_dashscope_extract(text)
if not parsed:
return out
if parsed.get("summary"):
out["situation_update"] = {
"summary": validate_summary(str(parsed["summary"])[:120], 120),
"category": validate_category(str(parsed.get("category", "other")).lower()),
"severity": validate_severity(str(parsed.get("severity", "medium")).lower()),
"timestamp": ts,
}
loss_us = {}
loss_ir = {}
for k in ["personnel_killed", "personnel_wounded", "civilian_killed", "civilian_wounded",
"bases_destroyed", "bases_damaged", "aircraft", "warships", "armor", "vehicles"]:
uk, ik = f"us_{k}", f"iran_{k}"
if uk in parsed and isinstance(parsed[uk], (int, float)):
loss_us[k] = max(0, int(parsed[uk]))
if ik in parsed and isinstance(parsed[ik], (int, float)):
loss_ir[k] = max(0, int(parsed[ik]))
if loss_us or loss_ir:
out["combat_losses_delta"] = {}
if loss_us:
out["combat_losses_delta"]["us"] = loss_us
if loss_ir:
out["combat_losses_delta"]["iran"] = loss_ir
if "retaliation_sentiment" in parsed:
v = parsed["retaliation_sentiment"]
if isinstance(v, (int, float)) and 0 <= v <= 100:
out["retaliation"] = {"value": int(v), "time": ts}
if "wall_street_value" in parsed:
v = parsed["wall_street_value"]
if isinstance(v, (int, float)) and 0 <= v <= 100:
out["wall_street"] = {"time": ts, "value": int(v)}
if "key_location_updates" in parsed and isinstance(parsed["key_location_updates"], list):
valid = []
for u in parsed["key_location_updates"]:
if isinstance(u, dict) and u.get("name_keywords") and u.get("side") in ("us", "iran"):
valid.append({
"name_keywords": str(u["name_keywords"]),
"side": u["side"],
"status": str(u.get("status", "attacked"))[:20],
"damage_level": min(3, max(1, int(u["damage_level"]))) if isinstance(u.get("damage_level"), (int, float)) else 2,
})
if valid:
out["key_location_updates"] = valid
return out

141
crawler/news_storage.py Normal file
View File

@@ -0,0 +1,141 @@
# -*- coding: utf-8 -*-
"""
资讯内容独立存储,支持历史去重
爬虫拉回数据 → 计算 content_hash → 若已存在则跳过(去重)→ 新数据落库 news_content
"""
import hashlib
import os
import re
import sqlite3
from datetime import datetime, timezone
from typing import List, Optional, Tuple
from config import DB_PATH
def _to_utc_iso(dt: datetime) -> str:
if dt.tzinfo:
dt = dt.astimezone(timezone.utc)
return dt.strftime("%Y-%m-%dT%H:%M:%S.000Z")
def _normalize_for_hash(text: str) -> str:
"""归一化文本用于生成去重 hash"""
if not text:
return ""
t = re.sub(r"\s+", " ", str(text).strip().lower())[:600]
return re.sub(r"[\x00-\x1f]", "", t)
def content_hash(title: str, summary: str, url: str) -> str:
"""根据标题、摘要、URL 生成去重 hash相似内容视为重复"""
raw = _normalize_for_hash(title) + "|" + _normalize_for_hash(summary) + "|" + (url or "").strip()
return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:32]
def _ensure_table(conn: sqlite3.Connection) -> None:
conn.execute("""
CREATE TABLE IF NOT EXISTS news_content (
id TEXT PRIMARY KEY,
content_hash TEXT NOT NULL UNIQUE,
title TEXT NOT NULL,
summary TEXT NOT NULL,
url TEXT NOT NULL DEFAULT '',
source TEXT NOT NULL DEFAULT '',
published_at TEXT NOT NULL,
category TEXT NOT NULL DEFAULT 'other',
severity TEXT NOT NULL DEFAULT 'medium',
created_at TEXT NOT NULL DEFAULT (datetime('now'))
)
""")
try:
conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_news_content_hash ON news_content(content_hash)")
except sqlite3.OperationalError:
pass
try:
conn.execute("CREATE INDEX IF NOT EXISTS idx_news_content_pub ON news_content(published_at DESC)")
except sqlite3.OperationalError:
pass
conn.commit()
def exists_by_hash(conn: sqlite3.Connection, h: str) -> bool:
row = conn.execute("SELECT 1 FROM news_content WHERE content_hash = ? LIMIT 1", (h,)).fetchone()
return row is not None
def insert_news(
conn: sqlite3.Connection,
*,
title: str,
summary: str,
url: str = "",
source: str = "",
published: datetime,
category: str = "other",
severity: str = "medium",
) -> Optional[str]:
"""
插入资讯,若 content_hash 已存在则跳过(去重)
返回: 新插入的 id或 None 表示重复跳过
"""
_ensure_table(conn)
h = content_hash(title, summary, url)
if exists_by_hash(conn, h):
return None
uid = "nc_" + hashlib.sha256(f"{h}{datetime.utcnow().isoformat()}".encode()).hexdigest()[:14]
ts = _to_utc_iso(published)
conn.execute(
"""INSERT INTO news_content (id, content_hash, title, summary, url, source, published_at, category, severity)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(uid, h, (title or "")[:500], (summary or "")[:2000], (url or "")[:500], (source or "")[:100], ts, category, severity),
)
conn.commit()
return uid
def save_and_dedup(items: List[dict], db_path: Optional[str] = None) -> Tuple[List[dict], int]:
"""
去重后落库 news_content
items: [{"title","summary","url","published","category","severity","source"?}, ...]
返回: (通过去重的新项列表, 实际新增条数)
"""
path = db_path or DB_PATH
if not os.path.exists(path):
return [], 0
conn = sqlite3.connect(path, timeout=10)
try:
_ensure_table(conn)
new_items: List[dict] = []
count = 0
for u in items:
title = (u.get("title") or "")[:500]
summary = (u.get("summary") or u.get("title") or "")[:2000]
url = (u.get("url") or "")[:500]
source = (u.get("source") or "")[:100]
pub = u.get("published")
if isinstance(pub, str):
try:
pub = datetime.fromisoformat(pub.replace("Z", "+00:00"))
except ValueError:
pub = datetime.now(timezone.utc)
elif pub is None:
pub = datetime.now(timezone.utc)
cat = u.get("category", "other")
sev = u.get("severity", "medium")
uid = insert_news(
conn,
title=title,
summary=summary,
url=url,
source=source,
published=pub,
category=cat,
severity=sev,
)
if uid:
count += 1
new_items.append({**u, "news_id": uid})
return new_items, count
finally:
conn.close()

View File

@@ -283,7 +283,7 @@ def _rss_to_gdelt_fallback() -> None:
# ==========================
# RSS 新闻抓取(补充 situation_update + AI 提取面板数据
# RSS 新闻抓取:资讯落库(去重) → AI 提取面板数据落库 → 通知前端
# ==========================
LAST_FETCH = {"items": 0, "inserted": 0, "error": None}
@@ -292,6 +292,7 @@ def fetch_news() -> None:
try:
from scrapers.rss_scraper import fetch_all
from db_writer import write_updates
from news_storage import save_and_dedup
from translate_utils import translate_to_chinese
from cleaner_ai import clean_news_for_panel
from cleaner_ai import ensure_category, ensure_severity
@@ -304,36 +305,44 @@ def fetch_news() -> None:
it["summary"] = clean_news_for_panel(raw_summary or raw_title, max_len=120)
it["category"] = ensure_category(it.get("category", "other"))
it["severity"] = ensure_severity(it.get("severity", "medium"))
n = write_updates(items) if items else 0
it["source"] = it.get("source") or "rss"
# 1. 历史去重:资讯内容落库 news_content独立表便于后续消费
new_items, n_news = save_and_dedup(items, db_path=DB_PATH)
# 2. 面板展示:新增资讯写入 situation_update供前端 recentUpdates
n_panel = write_updates(new_items) if new_items else 0
LAST_FETCH["items"] = len(items)
LAST_FETCH["inserted"] = n
if items:
_extract_and_merge_panel_data(items)
LAST_FETCH["inserted"] = n_news
# 3. AI 提取 + 合并到 combat_losses / key_location 等
if new_items:
_extract_and_merge_panel_data(new_items)
# GDELT 禁用时用 RSS 填充 gdelt_events使地图有冲突点
if GDELT_DISABLED:
_rss_to_gdelt_fallback()
# 每次抓取完成都通知 Node 更新时间戳,便于「实时更新」显示
_notify_node()
print(f"[{datetime.now().strftime('%H:%M:%S')}] RSS 抓取 {len(items)} 条,新增入库 {n}")
print(f"[{datetime.now().strftime('%H:%M:%S')}] RSS 抓取 {len(items)} 条,去重后新增 {n_news} 条资讯,面板 {n_panel}")
except Exception as e:
LAST_FETCH["error"] = str(e)
print(f"[{datetime.now().strftime('%H:%M:%S')}] 新闻抓取失败: {e}")
def _extract_and_merge_panel_data(items: list) -> None:
"""对新闻做 AI/规则 提取,合并到 combat_losses / retaliation / wall_street_trend 等表"""
"""AI 分析提取面板相关数据,清洗后落库"""
if not items or not os.path.exists(DB_PATH):
return
try:
from db_merge import merge
if os.environ.get("CLEANER_AI_DISABLED", "0") == "1":
use_dashscope = bool(os.environ.get("DASHSCOPE_API_KEY", "").strip())
if use_dashscope:
from extractor_dashscope import extract_from_news
limit = 10
elif os.environ.get("CLEANER_AI_DISABLED", "0") == "1":
from extractor_rules import extract_from_news
limit = 25
else:
from extractor_ai import extract_from_news
limit = 10
from datetime import timezone
merged_any = False
# 规则模式可多处理几条(无 OllamaAI 模式限制 5 条避免调用过多
limit = 25 if os.environ.get("CLEANER_AI_DISABLED", "0") == "1" else 10
for it in items[:limit]:
text = (it.get("title", "") or "") + " " + (it.get("summary", "") or "")
if len(text.strip()) < 20:

View File

@@ -3,3 +3,4 @@ feedparser>=6.0.0
fastapi>=0.109.0
uvicorn>=0.27.0
deep-translator>=1.11.0
dashscope>=1.20.0