187 lines
7.8 KiB
Python
187 lines
7.8 KiB
Python
# -*- coding: utf-8 -*-
|
||
"""
|
||
统一写库流水线:抓取 → 清洗 → 去重 → 映射到前端库字段 → 更新表 → 通知
|
||
与 server/README.md 第五节「爬虫侧写库链路」一致,供 main.py 与 realtime_conflict_service 共用。
|
||
"""
|
||
import os
|
||
from datetime import datetime, timezone
|
||
from typing import Callable, Optional, Tuple
|
||
|
||
from config import DB_PATH, API_BASE
|
||
|
||
|
||
def _notify_api(api_base: str) -> bool:
|
||
"""调用 Node API 触发立即广播"""
|
||
try:
|
||
import urllib.request
|
||
token = os.environ.get("API_CRAWLER_TOKEN", "").strip()
|
||
req = urllib.request.Request(
|
||
f"{api_base.rstrip('/')}/api/crawler/notify",
|
||
method="POST",
|
||
headers={
|
||
"Content-Type": "application/json",
|
||
**({"X-Crawler-Token": token} if token else {}),
|
||
},
|
||
)
|
||
with urllib.request.urlopen(req, timeout=5) as resp:
|
||
return resp.status == 200
|
||
except Exception as e:
|
||
print(f" [warn] notify API failed: {e}")
|
||
return False
|
||
|
||
|
||
def _extract_and_merge(items: list, db_path: str) -> bool:
|
||
"""AI 从新闻全文或标题+摘要中提取精确结构化数据,合并到 combat_losses / key_location 等表。"""
|
||
if not items or not os.path.exists(db_path):
|
||
return False
|
||
try:
|
||
from db_merge import merge
|
||
use_dashscope = bool(os.environ.get("DASHSCOPE_API_KEY", "").strip())
|
||
if use_dashscope:
|
||
from extractor_dashscope import extract_from_news
|
||
limit = 10
|
||
elif os.environ.get("CLEANER_AI_DISABLED", "0") == "1":
|
||
from extractor_rules import extract_from_news
|
||
limit = 25
|
||
else:
|
||
from extractor_ai import extract_from_news
|
||
limit = 10
|
||
merged_any = False
|
||
for it in items[:limit]:
|
||
# 优先用正文(article_fetcher 抓取),否则用标题+摘要,供 AI 提取精确数字
|
||
text = it.get("full_text") or ((it.get("title", "") or "") + " " + (it.get("summary", "") or ""))
|
||
if len(text.strip()) < 20:
|
||
continue
|
||
pub = it.get("published")
|
||
ts = None
|
||
if pub:
|
||
try:
|
||
if isinstance(pub, str):
|
||
pub_dt = datetime.fromisoformat(pub.replace("Z", "+00:00"))
|
||
else:
|
||
pub_dt = pub
|
||
if pub_dt.tzinfo:
|
||
pub_dt = pub_dt.astimezone(timezone.utc)
|
||
ts = pub_dt.strftime("%Y-%m-%dT%H:%M:%S.000Z")
|
||
except Exception:
|
||
pass
|
||
extracted = extract_from_news(text, timestamp=ts)
|
||
if extracted and merge(extracted, db_path=db_path):
|
||
merged_any = True
|
||
return merged_any
|
||
except Exception as e:
|
||
print(f" [warn] AI 面板数据提取/合并: {e}")
|
||
return False
|
||
|
||
|
||
def run_full_pipeline(
|
||
db_path: Optional[str] = None,
|
||
api_base: Optional[str] = None,
|
||
*,
|
||
translate: bool = True,
|
||
notify: bool = True,
|
||
on_notify: Optional[Callable[[], None]] = None,
|
||
) -> Tuple[int, int, int]:
|
||
"""
|
||
执行完整写库链路:
|
||
1. 爬虫抓取实时数据
|
||
2. AI 清洗(标题/摘要/分类)→ 有效数据
|
||
3. 去重(news_content content_hash)→ 仅新项进入后续
|
||
4. 有效数据映射到前端库字段(situation_update、news_content、combat_losses 等)
|
||
5. 更新数据库表;若有更新则通知后端
|
||
|
||
translate: 是否对标题/摘要做翻译(英→中)
|
||
notify: 是否在流水线末尾调用 POST /api/crawler/notify
|
||
on_notify: 若提供,在通知前调用(供 gdelt 服务做 GDELT 回填等)
|
||
|
||
返回: (本轮抓取条数, 去重后新增资讯数, 写入 situation_update 条数)
|
||
"""
|
||
path = db_path or DB_PATH
|
||
base = api_base or API_BASE
|
||
|
||
from scrapers.rss_scraper import fetch_all
|
||
from db_writer import write_updates
|
||
from news_storage import save_and_dedup
|
||
from cleaner_ai import clean_news_for_panel, ensure_category, ensure_severity
|
||
|
||
# 1. 抓取
|
||
items = fetch_all()
|
||
if not items:
|
||
return 0, 0, 0
|
||
|
||
# 可选:仅保留指定起始时间之后的条目(如 CRAWL_START_DATE=2026-02-28T00:00:00)
|
||
start_date_env = os.environ.get("CRAWL_START_DATE", "").strip()
|
||
if start_date_env:
|
||
try:
|
||
raw = start_date_env.replace("Z", "+00:00").strip()
|
||
start_dt = datetime.fromisoformat(raw)
|
||
if start_dt.tzinfo is None:
|
||
start_dt = start_dt.replace(tzinfo=timezone.utc)
|
||
else:
|
||
start_dt = start_dt.astimezone(timezone.utc)
|
||
before = len(items)
|
||
items = [it for it in items if (it.get("published") or datetime.min.replace(tzinfo=timezone.utc)) >= start_dt]
|
||
if before > len(items):
|
||
print(f" [pipeline] 按 CRAWL_START_DATE={start_date_env} 过滤后保留 {len(items)} 条(原 {before} 条)")
|
||
except Exception as e:
|
||
print(f" [warn] CRAWL_START_DATE 解析失败,忽略: {e}")
|
||
|
||
if not items:
|
||
return 0, 0, 0
|
||
n_total = len(items)
|
||
print(f" [pipeline] 抓取 {n_total} 条")
|
||
for i, it in enumerate(items[:5]):
|
||
title = (it.get("title") or it.get("summary") or "").strip()[:60]
|
||
print(f" [{i + 1}] {title}" + ("…" if len((it.get("title") or it.get("summary") or "")[:60]) >= 60 else ""))
|
||
if n_total > 5:
|
||
print(f" ... 共 {n_total} 条")
|
||
|
||
# 2. 清洗(标题/摘要/分类,符合面板 schema)
|
||
if translate:
|
||
from translate_utils import translate_to_chinese
|
||
for it in items:
|
||
raw_title = translate_to_chinese(it.get("title", "") or "")
|
||
raw_summary = translate_to_chinese(it.get("summary", "") or it.get("title", ""))
|
||
it["title"] = clean_news_for_panel(raw_title, max_len=80)
|
||
it["summary"] = clean_news_for_panel(raw_summary or raw_title, max_len=120)
|
||
else:
|
||
for it in items:
|
||
it["title"] = clean_news_for_panel(it.get("title", "") or "", max_len=80)
|
||
it["summary"] = clean_news_for_panel(it.get("summary", "") or it.get("title", ""), max_len=120)
|
||
for it in items:
|
||
it["category"] = ensure_category(it.get("category", "other"))
|
||
it["severity"] = ensure_severity(it.get("severity", "medium"))
|
||
it["source"] = it.get("source") or "rss"
|
||
|
||
# 3. 去重:落库 news_content,仅新项返回
|
||
new_items, n_news = save_and_dedup(items, db_path=path)
|
||
if new_items:
|
||
print(f" [pipeline] 去重后新增 {n_news} 条,写入事件脉络 {len(new_items)} 条")
|
||
for i, it in enumerate(new_items[:3]):
|
||
title = (it.get("title") or it.get("summary") or "").strip()[:55]
|
||
print(f" 新增 [{i + 1}] {title}" + ("…" if len((it.get("title") or it.get("summary") or "").strip()) > 55 else ""))
|
||
|
||
# 3.5 数据增强:为参与 AI 提取的条目抓取正文,便于从全文提取精确数据(伤亡、基地等)
|
||
if new_items:
|
||
try:
|
||
from article_fetcher import enrich_item_with_body
|
||
# 仅对前若干条抓取正文,避免单轮请求过多
|
||
enrich_limit = int(os.environ.get("ARTICLE_FETCH_LIMIT", "10"))
|
||
for it in new_items[:enrich_limit]:
|
||
enrich_item_with_body(it)
|
||
except Exception as e:
|
||
print(f" [warn] 正文抓取: {e}")
|
||
|
||
# 4. 映射到前端库字段并更新表
|
||
n_panel = write_updates(new_items) if new_items else 0
|
||
if new_items:
|
||
_extract_and_merge(new_items, path)
|
||
|
||
# 5. 通知(有新增时才通知;可选:先执行外部逻辑如 GDELT 回填,再通知)
|
||
if on_notify:
|
||
on_notify()
|
||
if notify and (n_panel > 0 or n_news > 0):
|
||
_notify_api(base)
|
||
|
||
return len(items), n_news, n_panel
|