# -*- coding: utf-8 -*- """ 资讯内容独立存储,支持历史去重 爬虫拉回数据 → 计算 content_hash → 若已存在则跳过(去重)→ 新数据落库 news_content """ import hashlib import os import re import sqlite3 from datetime import datetime, timezone from typing import List, Optional, Tuple from config import DB_PATH def _to_utc_iso(dt: datetime) -> str: if dt.tzinfo: dt = dt.astimezone(timezone.utc) return dt.strftime("%Y-%m-%dT%H:%M:%S.000Z") def _normalize_for_hash(text: str) -> str: """归一化文本用于生成去重 hash""" if not text: return "" t = re.sub(r"\s+", " ", str(text).strip().lower())[:600] return re.sub(r"[\x00-\x1f]", "", t) def content_hash(title: str, summary: str, url: str) -> str: """根据标题、摘要、URL 生成去重 hash,相似内容视为重复""" raw = _normalize_for_hash(title) + "|" + _normalize_for_hash(summary) + "|" + (url or "").strip() return hashlib.sha256(raw.encode("utf-8")).hexdigest()[:32] def _ensure_table(conn: sqlite3.Connection) -> None: conn.execute(""" CREATE TABLE IF NOT EXISTS news_content ( id TEXT PRIMARY KEY, content_hash TEXT NOT NULL UNIQUE, title TEXT NOT NULL, summary TEXT NOT NULL, url TEXT NOT NULL DEFAULT '', source TEXT NOT NULL DEFAULT '', published_at TEXT NOT NULL, category TEXT NOT NULL DEFAULT 'other', severity TEXT NOT NULL DEFAULT 'medium', created_at TEXT NOT NULL DEFAULT (datetime('now')) ) """) try: conn.execute("CREATE UNIQUE INDEX IF NOT EXISTS idx_news_content_hash ON news_content(content_hash)") except sqlite3.OperationalError: pass try: conn.execute("CREATE INDEX IF NOT EXISTS idx_news_content_pub ON news_content(published_at DESC)") except sqlite3.OperationalError: pass conn.commit() def exists_by_hash(conn: sqlite3.Connection, h: str) -> bool: row = conn.execute("SELECT 1 FROM news_content WHERE content_hash = ? LIMIT 1", (h,)).fetchone() return row is not None def insert_news( conn: sqlite3.Connection, *, title: str, summary: str, url: str = "", source: str = "", published: datetime, category: str = "other", severity: str = "medium", ) -> Optional[str]: """ 插入资讯,若 content_hash 已存在则跳过(去重) 返回: 新插入的 id,或 None 表示重复跳过 """ _ensure_table(conn) h = content_hash(title, summary, url) if exists_by_hash(conn, h): return None uid = "nc_" + hashlib.sha256(f"{h}{datetime.utcnow().isoformat()}".encode()).hexdigest()[:14] ts = _to_utc_iso(published) conn.execute( """INSERT INTO news_content (id, content_hash, title, summary, url, source, published_at, category, severity) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", (uid, h, (title or "")[:500], (summary or "")[:2000], (url or "")[:500], (source or "")[:100], ts, category, severity), ) conn.commit() return uid def save_and_dedup(items: List[dict], db_path: Optional[str] = None) -> Tuple[List[dict], int]: """ 去重后落库 news_content items: [{"title","summary","url","published","category","severity","source"?}, ...] 返回: (通过去重的新项列表, 实际新增条数) """ path = db_path or DB_PATH if not os.path.exists(path): return [], 0 conn = sqlite3.connect(path, timeout=10) try: _ensure_table(conn) new_items: List[dict] = [] count = 0 for u in items: title = (u.get("title") or "")[:500] summary = (u.get("summary") or u.get("title") or "")[:2000] url = (u.get("url") or "")[:500] source = (u.get("source") or "")[:100] pub = u.get("published") if isinstance(pub, str): try: pub = datetime.fromisoformat(pub.replace("Z", "+00:00")) except ValueError: pub = datetime.now(timezone.utc) elif pub is None: pub = datetime.now(timezone.utc) cat = u.get("category", "other") sev = u.get("severity", "medium") uid = insert_news( conn, title=title, summary=summary, url=url, source=source, published=pub, category=cat, severity=sev, ) if uid: count += 1 new_items.append({**u, "news_id": uid}) return new_items, count finally: conn.close()