# -*- coding: utf-8 -*- """RSS 抓取""" import re from datetime import datetime, timezone import feedparser from config import RSS_FEEDS, KEYWORDS from parser_ai import classify_and_severity def _parse_date(entry) -> datetime: for attr in ("published_parsed", "updated_parsed"): val = getattr(entry, attr, None) if val: try: return datetime(*val[:6], tzinfo=timezone.utc) except (TypeError, ValueError): pass return datetime.now(timezone.utc) def _strip_html(s: str) -> str: return re.sub(r"<[^>]+>", "", s) if s else "" def _matches_keywords(text: str) -> bool: t = (text or "").lower() for k in KEYWORDS: if k.lower() in t: return True return False def fetch_all() -> list[dict]: import socket items: list[dict] = [] seen: set[str] = set() # 单源超时 10 秒,避免某源卡住 old_timeout = socket.getdefaulttimeout() socket.setdefaulttimeout(10) try: for url in RSS_FEEDS: try: feed = feedparser.parse( url, request_headers={"User-Agent": "US-Iran-Dashboard/1.0"}, agent="US-Iran-Dashboard/1.0", ) except Exception: continue for entry in feed.entries: title = getattr(entry, "title", "") or "" raw_summary = getattr(entry, "summary", "") or getattr(entry, "description", "") or "" summary = _strip_html(raw_summary) link = getattr(entry, "link", "") or "" text = f"{title} {summary}" if not _matches_keywords(text): continue key = (title[:80], link) if key in seen: continue seen.add(key) published = _parse_date(entry) cat, sev = classify_and_severity(text) items.append({ "title": title, "summary": summary[:400] if summary else title, "url": link, "published": _parse_date(entry), "category": cat, "severity": sev, }) finally: socket.setdefaulttimeout(old_timeout) return items