# -*- coding: utf-8 -*- """ 统一写库流水线:抓取 → 清洗 → 去重 → 映射到前端库字段 → 更新表 → 通知 与 server/README.md 第五节「爬虫侧写库链路」一致,供 main.py 与 realtime_conflict_service 共用。 """ import os from datetime import datetime, timezone from typing import Callable, Optional, Tuple from config import DB_PATH, API_BASE def _notify_api(api_base: str) -> bool: """调用 Node API 触发立即广播""" try: import urllib.request req = urllib.request.Request( f"{api_base.rstrip('/')}/api/crawler/notify", method="POST", headers={"Content-Type": "application/json"}, ) with urllib.request.urlopen(req, timeout=5) as resp: return resp.status == 200 except Exception as e: print(f" [warn] notify API failed: {e}") return False def _extract_and_merge(items: list, db_path: str) -> bool: """AI 从新闻全文或标题+摘要中提取精确结构化数据,合并到 combat_losses / key_location 等表。""" if not items or not os.path.exists(db_path): return False try: from db_merge import merge use_dashscope = bool(os.environ.get("DASHSCOPE_API_KEY", "").strip()) if use_dashscope: from extractor_dashscope import extract_from_news limit = 10 elif os.environ.get("CLEANER_AI_DISABLED", "0") == "1": from extractor_rules import extract_from_news limit = 25 else: from extractor_ai import extract_from_news limit = 10 merged_any = False for it in items[:limit]: # 优先用正文(article_fetcher 抓取),否则用标题+摘要,供 AI 提取精确数字 text = it.get("full_text") or ((it.get("title", "") or "") + " " + (it.get("summary", "") or "")) if len(text.strip()) < 20: continue pub = it.get("published") ts = None if pub: try: if isinstance(pub, str): pub_dt = datetime.fromisoformat(pub.replace("Z", "+00:00")) else: pub_dt = pub if pub_dt.tzinfo: pub_dt = pub_dt.astimezone(timezone.utc) ts = pub_dt.strftime("%Y-%m-%dT%H:%M:%S.000Z") except Exception: pass extracted = extract_from_news(text, timestamp=ts) if extracted and merge(extracted, db_path=db_path): merged_any = True return merged_any except Exception as e: print(f" [warn] AI 面板数据提取/合并: {e}") return False def run_full_pipeline( db_path: Optional[str] = None, api_base: Optional[str] = None, *, translate: bool = True, notify: bool = True, on_notify: Optional[Callable[[], None]] = None, ) -> Tuple[int, int, int]: """ 执行完整写库链路: 1. 爬虫抓取实时数据 2. AI 清洗(标题/摘要/分类)→ 有效数据 3. 去重(news_content content_hash)→ 仅新项进入后续 4. 有效数据映射到前端库字段(situation_update、news_content、combat_losses 等) 5. 更新数据库表;若有更新则通知后端 translate: 是否对标题/摘要做翻译(英→中) notify: 是否在流水线末尾调用 POST /api/crawler/notify on_notify: 若提供,在通知前调用(供 gdelt 服务做 GDELT 回填等) 返回: (本轮抓取条数, 去重后新增资讯数, 写入 situation_update 条数) """ path = db_path or DB_PATH base = api_base or API_BASE from scrapers.rss_scraper import fetch_all from db_writer import write_updates from news_storage import save_and_dedup from cleaner_ai import clean_news_for_panel, ensure_category, ensure_severity # 1. 抓取 items = fetch_all() if not items: return 0, 0, 0 # 2. 清洗(标题/摘要/分类,符合面板 schema) if translate: from translate_utils import translate_to_chinese for it in items: raw_title = translate_to_chinese(it.get("title", "") or "") raw_summary = translate_to_chinese(it.get("summary", "") or it.get("title", "")) it["title"] = clean_news_for_panel(raw_title, max_len=80) it["summary"] = clean_news_for_panel(raw_summary or raw_title, max_len=120) else: for it in items: it["title"] = clean_news_for_panel(it.get("title", "") or "", max_len=80) it["summary"] = clean_news_for_panel(it.get("summary", "") or it.get("title", ""), max_len=120) for it in items: it["category"] = ensure_category(it.get("category", "other")) it["severity"] = ensure_severity(it.get("severity", "medium")) it["source"] = it.get("source") or "rss" # 3. 去重:落库 news_content,仅新项返回 new_items, n_news = save_and_dedup(items, db_path=path) # 3.5 数据增强:为参与 AI 提取的条目抓取正文,便于从全文提取精确数据(伤亡、基地等) if new_items: try: from article_fetcher import enrich_item_with_body # 仅对前若干条抓取正文,避免单轮请求过多 enrich_limit = int(os.environ.get("ARTICLE_FETCH_LIMIT", "10")) for it in new_items[:enrich_limit]: enrich_item_with_body(it) except Exception as e: print(f" [warn] 正文抓取: {e}") # 4. 映射到前端库字段并更新表 n_panel = write_updates(new_items) if new_items else 0 if new_items: _extract_and_merge(new_items, path) # 5. 通知(有新增时才通知;可选:先执行外部逻辑如 GDELT 回填,再通知) if on_notify: on_notify() if notify and (n_panel > 0 or n_news > 0): _notify_api(base) return len(items), n_news, n_panel