# -*- coding: utf-8 -*- """ 统一写库流水线:抓取 → 清洗 → 去重 → 映射到前端库字段 → 更新表 → 通知 与 server/README.md 第五节「爬虫侧写库链路」一致,供 main.py 与 realtime_conflict_service 共用。 """ import os from datetime import datetime, timezone from typing import Callable, Optional, Tuple from config import DB_PATH, API_BASE def _notify_api(api_base: str) -> bool: """调用 Node API 触发立即广播""" try: import urllib.request token = os.environ.get("API_CRAWLER_TOKEN", "").strip() req = urllib.request.Request( f"{api_base.rstrip('/')}/api/crawler/notify", method="POST", headers={ "Content-Type": "application/json", **({"X-Crawler-Token": token} if token else {}), }, ) with urllib.request.urlopen(req, timeout=5) as resp: return resp.status == 200 except Exception as e: print(f" [warn] notify API failed: {e}") return False def _extract_and_merge(items: list, db_path: str) -> bool: """AI 从新闻全文或标题+摘要中提取精确结构化数据,合并到 combat_losses / key_location 等表。""" if not items or not os.path.exists(db_path): return False try: from db_merge import merge use_dashscope = bool(os.environ.get("DASHSCOPE_API_KEY", "").strip()) if use_dashscope: from extractor_dashscope import extract_from_news limit = 10 elif os.environ.get("CLEANER_AI_DISABLED", "0") == "1": from extractor_rules import extract_from_news limit = 25 else: from extractor_ai import extract_from_news limit = 10 merged_any = False for it in items[:limit]: # 优先用正文(article_fetcher 抓取),否则用标题+摘要,供 AI 提取精确数字 text = it.get("full_text") or ((it.get("title", "") or "") + " " + (it.get("summary", "") or "")) if len(text.strip()) < 20: continue pub = it.get("published") ts = None if pub: try: if isinstance(pub, str): pub_dt = datetime.fromisoformat(pub.replace("Z", "+00:00")) else: pub_dt = pub if pub_dt.tzinfo: pub_dt = pub_dt.astimezone(timezone.utc) ts = pub_dt.strftime("%Y-%m-%dT%H:%M:%S.000Z") except Exception: pass extracted = extract_from_news(text, timestamp=ts) if extracted and merge(extracted, db_path=db_path): merged_any = True return merged_any except Exception as e: print(f" [warn] AI 面板数据提取/合并: {e}") return False def run_full_pipeline( db_path: Optional[str] = None, api_base: Optional[str] = None, *, translate: bool = True, notify: bool = True, on_notify: Optional[Callable[[], None]] = None, ) -> Tuple[int, int, int]: """ 执行完整写库链路: 1. 爬虫抓取实时数据 2. AI 清洗(标题/摘要/分类)→ 有效数据 3. 去重(news_content content_hash)→ 仅新项进入后续 4. 有效数据映射到前端库字段(situation_update、news_content、combat_losses 等) 5. 更新数据库表;若有更新则通知后端 translate: 是否对标题/摘要做翻译(英→中) notify: 是否在流水线末尾调用 POST /api/crawler/notify on_notify: 若提供,在通知前调用(供 gdelt 服务做 GDELT 回填等) 返回: (本轮抓取条数, 去重后新增资讯数, 写入 situation_update 条数) """ path = db_path or DB_PATH base = api_base or API_BASE from scrapers.rss_scraper import fetch_all from db_writer import write_updates from news_storage import save_and_dedup from cleaner_ai import clean_news_for_panel, ensure_category, ensure_severity # 1. 抓取 items = fetch_all() if not items: return 0, 0, 0 # 可选:仅保留指定起始时间之后的条目(如 CRAWL_START_DATE=2026-02-28T00:00:00) start_date_env = os.environ.get("CRAWL_START_DATE", "").strip() if start_date_env: try: raw = start_date_env.replace("Z", "+00:00").strip() start_dt = datetime.fromisoformat(raw) if start_dt.tzinfo is None: start_dt = start_dt.replace(tzinfo=timezone.utc) else: start_dt = start_dt.astimezone(timezone.utc) before = len(items) items = [it for it in items if (it.get("published") or datetime.min.replace(tzinfo=timezone.utc)) >= start_dt] if before > len(items): print(f" [pipeline] 按 CRAWL_START_DATE={start_date_env} 过滤后保留 {len(items)} 条(原 {before} 条)") except Exception as e: print(f" [warn] CRAWL_START_DATE 解析失败,忽略: {e}") if not items: return 0, 0, 0 n_total = len(items) print(f" [pipeline] 抓取 {n_total} 条") for i, it in enumerate(items[:5]): title = (it.get("title") or it.get("summary") or "").strip()[:60] print(f" [{i + 1}] {title}" + ("…" if len((it.get("title") or it.get("summary") or "")[:60]) >= 60 else "")) if n_total > 5: print(f" ... 共 {n_total} 条") # 2. 清洗(标题/摘要/分类,符合面板 schema) if translate: from translate_utils import translate_to_chinese for it in items: raw_title = translate_to_chinese(it.get("title", "") or "") raw_summary = translate_to_chinese(it.get("summary", "") or it.get("title", "")) it["title"] = clean_news_for_panel(raw_title, max_len=80) it["summary"] = clean_news_for_panel(raw_summary or raw_title, max_len=120) else: for it in items: it["title"] = clean_news_for_panel(it.get("title", "") or "", max_len=80) it["summary"] = clean_news_for_panel(it.get("summary", "") or it.get("title", ""), max_len=120) for it in items: it["category"] = ensure_category(it.get("category", "other")) it["severity"] = ensure_severity(it.get("severity", "medium")) it["source"] = it.get("source") or "rss" # 3. 去重:落库 news_content,仅新项返回 new_items, n_news = save_and_dedup(items, db_path=path) if new_items: print(f" [pipeline] 去重后新增 {n_news} 条,写入事件脉络 {len(new_items)} 条") for i, it in enumerate(new_items[:3]): title = (it.get("title") or it.get("summary") or "").strip()[:55] print(f" 新增 [{i + 1}] {title}" + ("…" if len((it.get("title") or it.get("summary") or "").strip()) > 55 else "")) # 3.5 数据增强:为参与 AI 提取的条目抓取正文,便于从全文提取精确数据(伤亡、基地等) if new_items: try: from article_fetcher import enrich_item_with_body # 仅对前若干条抓取正文,避免单轮请求过多 enrich_limit = int(os.environ.get("ARTICLE_FETCH_LIMIT", "10")) for it in new_items[:enrich_limit]: enrich_item_with_body(it) except Exception as e: print(f" [warn] 正文抓取: {e}") # 4. 映射到前端库字段并更新表 n_panel = write_updates(new_items) if new_items else 0 if new_items: _extract_and_merge(new_items, path) # 5. 通知(有新增时才通知;可选:先执行外部逻辑如 GDELT 回填,再通知) if on_notify: on_notify() if notify and (n_panel > 0 or n_news > 0): _notify_api(base) return len(items), n_news, n_panel