Files
usa/crawler/pipeline.py
2026-03-03 17:27:55 +08:00

155 lines
6.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
统一写库流水线:抓取 → 清洗 → 去重 → 映射到前端库字段 → 更新表 → 通知
与 server/README.md 第五节「爬虫侧写库链路」一致,供 main.py 与 realtime_conflict_service 共用。
"""
import os
from datetime import datetime, timezone
from typing import Callable, Optional, Tuple
from config import DB_PATH, API_BASE
def _notify_api(api_base: str) -> bool:
"""调用 Node API 触发立即广播"""
try:
import urllib.request
token = os.environ.get("API_CRAWLER_TOKEN", "").strip()
req = urllib.request.Request(
f"{api_base.rstrip('/')}/api/crawler/notify",
method="POST",
headers={
"Content-Type": "application/json",
**({"X-Crawler-Token": token} if token else {}),
},
)
with urllib.request.urlopen(req, timeout=5) as resp:
return resp.status == 200
except Exception as e:
print(f" [warn] notify API failed: {e}")
return False
def _extract_and_merge(items: list, db_path: str) -> bool:
"""AI 从新闻全文或标题+摘要中提取精确结构化数据,合并到 combat_losses / key_location 等表。"""
if not items or not os.path.exists(db_path):
return False
try:
from db_merge import merge
use_dashscope = bool(os.environ.get("DASHSCOPE_API_KEY", "").strip())
if use_dashscope:
from extractor_dashscope import extract_from_news
limit = 10
elif os.environ.get("CLEANER_AI_DISABLED", "0") == "1":
from extractor_rules import extract_from_news
limit = 25
else:
from extractor_ai import extract_from_news
limit = 10
merged_any = False
for it in items[:limit]:
# 优先用正文article_fetcher 抓取),否则用标题+摘要,供 AI 提取精确数字
text = it.get("full_text") or ((it.get("title", "") or "") + " " + (it.get("summary", "") or ""))
if len(text.strip()) < 20:
continue
pub = it.get("published")
ts = None
if pub:
try:
if isinstance(pub, str):
pub_dt = datetime.fromisoformat(pub.replace("Z", "+00:00"))
else:
pub_dt = pub
if pub_dt.tzinfo:
pub_dt = pub_dt.astimezone(timezone.utc)
ts = pub_dt.strftime("%Y-%m-%dT%H:%M:%S.000Z")
except Exception:
pass
extracted = extract_from_news(text, timestamp=ts)
if extracted and merge(extracted, db_path=db_path):
merged_any = True
return merged_any
except Exception as e:
print(f" [warn] AI 面板数据提取/合并: {e}")
return False
def run_full_pipeline(
db_path: Optional[str] = None,
api_base: Optional[str] = None,
*,
translate: bool = True,
notify: bool = True,
on_notify: Optional[Callable[[], None]] = None,
) -> Tuple[int, int, int]:
"""
执行完整写库链路:
1. 爬虫抓取实时数据
2. AI 清洗(标题/摘要/分类)→ 有效数据
3. 去重news_content content_hash→ 仅新项进入后续
4. 有效数据映射到前端库字段situation_update、news_content、combat_losses 等)
5. 更新数据库表;若有更新则通知后端
translate: 是否对标题/摘要做翻译(英→中)
notify: 是否在流水线末尾调用 POST /api/crawler/notify
on_notify: 若提供,在通知前调用(供 gdelt 服务做 GDELT 回填等)
返回: (本轮抓取条数, 去重后新增资讯数, 写入 situation_update 条数)
"""
path = db_path or DB_PATH
base = api_base or API_BASE
from scrapers.rss_scraper import fetch_all
from db_writer import write_updates
from news_storage import save_and_dedup
from cleaner_ai import clean_news_for_panel, ensure_category, ensure_severity
# 1. 抓取
items = fetch_all()
if not items:
return 0, 0, 0
# 2. 清洗(标题/摘要/分类,符合面板 schema
if translate:
from translate_utils import translate_to_chinese
for it in items:
raw_title = translate_to_chinese(it.get("title", "") or "")
raw_summary = translate_to_chinese(it.get("summary", "") or it.get("title", ""))
it["title"] = clean_news_for_panel(raw_title, max_len=80)
it["summary"] = clean_news_for_panel(raw_summary or raw_title, max_len=120)
else:
for it in items:
it["title"] = clean_news_for_panel(it.get("title", "") or "", max_len=80)
it["summary"] = clean_news_for_panel(it.get("summary", "") or it.get("title", ""), max_len=120)
for it in items:
it["category"] = ensure_category(it.get("category", "other"))
it["severity"] = ensure_severity(it.get("severity", "medium"))
it["source"] = it.get("source") or "rss"
# 3. 去重:落库 news_content仅新项返回
new_items, n_news = save_and_dedup(items, db_path=path)
# 3.5 数据增强:为参与 AI 提取的条目抓取正文,便于从全文提取精确数据(伤亡、基地等)
if new_items:
try:
from article_fetcher import enrich_item_with_body
# 仅对前若干条抓取正文,避免单轮请求过多
enrich_limit = int(os.environ.get("ARTICLE_FETCH_LIMIT", "10"))
for it in new_items[:enrich_limit]:
enrich_item_with_body(it)
except Exception as e:
print(f" [warn] 正文抓取: {e}")
# 4. 映射到前端库字段并更新表
n_panel = write_updates(new_items) if new_items else 0
if new_items:
_extract_and_merge(new_items, path)
# 5. 通知(有新增时才通知;可选:先执行外部逻辑如 GDELT 回填,再通知)
if on_notify:
on_notify()
if notify and (n_panel > 0 or n_news > 0):
_notify_api(base)
return len(items), n_news, n_panel