From 98d928f4573bc08a71929c41af722b69de0648d6 Mon Sep 17 00:00:00 2001 From: Daniel Date: Thu, 5 Mar 2026 19:53:05 +0800 Subject: [PATCH] =?UTF-8?q?fix:=E4=BC=98=E5=8C=96pm2=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E9=A1=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crawler/db_merge.py | 17 +++++++++++++ crawler/db_writer.py | 4 +-- crawler/extractor_ai.py | 9 +++++++ crawler/extractor_dashscope.py | 10 ++++++++ crawler/panel_schema.py | 2 +- crawler/parser.py | 18 ++++++++++++-- crawler/parser_ai.py | 14 ++++++++++- crawler/run_uvicorn.sh | 9 +++++++ crawler/scrapers/rss_scraper.py | 9 ++++--- ecosystem.config.cjs | 43 +++++++++++++++++++++++++++++++++ 10 files changed, 125 insertions(+), 10 deletions(-) create mode 100644 crawler/run_uvicorn.sh create mode 100644 ecosystem.config.cjs diff --git a/crawler/db_merge.py b/crawler/db_merge.py index 910ea29..30b6279 100644 --- a/crawler/db_merge.py +++ b/crawler/db_merge.py @@ -183,6 +183,23 @@ def merge(extracted: Dict[str, Any], db_path: Optional[str] = None) -> bool: updated = True except Exception: pass + # force_summary 增量:导弹消耗(看板「导弹消耗」「导弹库存」由 force_summary 提供) + if "force_summary_delta" in extracted: + for side, delta in extracted["force_summary_delta"].items(): + if side not in ("us", "iran"): + continue + mc = delta.get("missile_consumed") + if mc is not None and isinstance(mc, (int, float)) and mc > 0: + mc = min(int(mc), 500) + try: + cur = conn.execute( + "UPDATE force_summary SET missile_consumed = missile_consumed + ?, missile_stock = max(0, missile_stock - ?) WHERE side = ?", + (mc, mc, side), + ) + if cur.rowcount > 0: + updated = True + except Exception: + pass # retaliation if "retaliation" in extracted: r = extracted["retaliation"] diff --git a/crawler/db_writer.py b/crawler/db_writer.py index b27f10c..5577ec8 100644 --- a/crawler/db_writer.py +++ b/crawler/db_writer.py @@ -4,7 +4,7 @@ import sqlite3 import hashlib import os from datetime import datetime, timezone -from typing import Optional +from typing import List, Optional from config import DB_PATH @@ -87,7 +87,7 @@ def touch_situation_updated_at_path(db_path: Optional[str] = None) -> bool: conn.close() -def write_updates(updates: list[dict], db_path: Optional[str] = None) -> int: +def write_updates(updates: List[dict], db_path: Optional[str] = None) -> int: """ updates: [{"title","summary","url","published","category","severity"}, ...] db_path: 与 pipeline 一致,缺省用 config.DB_PATH diff --git a/crawler/extractor_ai.py b/crawler/extractor_ai.py index b83c365..0b26d89 100644 --- a/crawler/extractor_ai.py +++ b/crawler/extractor_ai.py @@ -51,6 +51,7 @@ def _call_ollama_extract(text: str, timeout: int = 15) -> Optional[Dict[str, Any - retaliation_sentiment: 0-100,仅当报道涉及伊朗报复/反击情绪时 - wall_street_value: 0-100,仅当报道涉及美股/市场时 - key_location_updates: **双方攻击地点**。每项 {{ "name_keywords": "阿萨德|asad|al-asad", "side": "us或iran(被打击方)", "status": "attacked", "damage_level": 1-3 }}。美军基地例:阿萨德|asad、乌代德|udeid、埃尔比勒|erbil、因吉尔利克|incirlik。伊朗例:德黑兰|tehran、布什尔|bushehr、伊斯法罕|isfahan、阿巴斯|abbas、纳坦兹|natanz +- **导弹消耗增量**(仅当报道明确提到「发射/消耗 了 X 枚导弹」时填,用于看板导弹消耗累计): us_missile_consumed_delta, iran_missile_consumed_delta(本则报道中该方新增消耗枚数,整数) 原文: {raw} @@ -132,4 +133,12 @@ def extract_from_news(text: str, timestamp: Optional[str] = None) -> Dict[str, A }) if valid: out["key_location_updates"] = valid + # force_summary 增量:导弹消耗(看板「导弹消耗」由 force_summary.missile_consumed 提供) + fs_delta = {} + for side_key, side_val in [("us_missile_consumed_delta", "us"), ("iran_missile_consumed_delta", "iran")]: + v = parsed.get(side_key) + if isinstance(v, (int, float)) and v > 0: + fs_delta[side_val] = {"missile_consumed": min(500, int(v))} + if fs_delta: + out["force_summary_delta"] = fs_delta return out diff --git a/crawler/extractor_dashscope.py b/crawler/extractor_dashscope.py index 9001ba8..cdd2835 100644 --- a/crawler/extractor_dashscope.py +++ b/crawler/extractor_dashscope.py @@ -42,6 +42,7 @@ def _call_dashscope_extract(text: str, timeout: int = 15) -> Optional[Dict[str, - retaliation_sentiment: 0-100(仅当报道涉及伊朗报复情绪时) - wall_street_value: 0-100(仅当报道涉及美股/市场时) - key_location_updates: **双方攻击地点**。每项 {{"name_keywords":"阿萨德|asad","side":"us或iran(被打击方)","status":"attacked","damage_level":1-3}}。美军基地:阿萨德|asad、乌代德|udeid、埃尔比勒|erbil、因吉尔利克|incirlik。伊朗:德黑兰|tehran、布什尔|bushehr、伊斯法罕|isfahan、阿巴斯|abbas、纳坦兹|natanz +- **导弹消耗增量**(仅当报道明确提到「发射/消耗 了 X 枚导弹」时填): us_missile_consumed_delta, iran_missile_consumed_delta(本则该方新增消耗枚数,整数) 原文: {raw} @@ -110,6 +111,15 @@ def extract_from_news(text: str, timestamp: Optional[str] = None) -> Dict[str, A if isinstance(v, (int, float)) and 0 <= v <= 100: out["wall_street"] = {"time": ts, "value": int(v)} + # force_summary 增量:导弹消耗(看板「导弹消耗」) + fs_delta = {} + for key, side in [("us_missile_consumed_delta", "us"), ("iran_missile_consumed_delta", "iran")]: + v = parsed.get(key) + if isinstance(v, (int, float)) and v > 0: + fs_delta[side] = {"missile_consumed": min(500, int(v))} + if fs_delta: + out["force_summary_delta"] = fs_delta + if "key_location_updates" in parsed and isinstance(parsed["key_location_updates"], list): valid = [] for u in parsed["key_location_updates"]: diff --git a/crawler/panel_schema.py b/crawler/panel_schema.py index e9a1ac5..87974f4 100644 --- a/crawler/panel_schema.py +++ b/crawler/panel_schema.py @@ -3,7 +3,7 @@ 前端面板完整数据 schema,与 DB / situationData / useReplaySituation 对齐 爬虫 + AI 清洗后的数据必须符合此 schema 才能正确更新前端 """ -from typing import Any, Dict, List, Literal, Optional, Tuple +from typing import Any, Dict, List, Optional, Tuple # 事件脉络 SITUATION_UPDATE_CATEGORIES = ("deployment", "alert", "intel", "diplomatic", "other") diff --git a/crawler/parser.py b/crawler/parser.py index f069872..42922d4 100644 --- a/crawler/parser.py +++ b/crawler/parser.py @@ -1,7 +1,21 @@ # -*- coding: utf-8 -*- """新闻分类与严重度判定""" import re -from typing import Literal +from typing import List + +try: + from typing import Literal # type: ignore +except ImportError: + try: + from typing_extensions import Literal # type: ignore + except ImportError: + from typing import Any + + class _LiteralFallback: + def __getitem__(self, item): + return Any + + Literal = _LiteralFallback() Category = Literal["deployment", "alert", "intel", "diplomatic", "other"] Severity = Literal["low", "medium", "high", "critical"] @@ -13,7 +27,7 @@ CAT_INTEL = ["satellite", "intel", "image", "surveillance", "卫星", "情报"] CAT_DIPLOMATIC = ["talk", "negotiation", "diplomat", "sanction", "谈判", "制裁"] -def _match(text: str, words: list[str]) -> bool: +def _match(text: str, words: List[str]) -> bool: t = (text or "").lower() for w in words: if w.lower() in t: diff --git a/crawler/parser_ai.py b/crawler/parser_ai.py index cf1ca14..caa5522 100644 --- a/crawler/parser_ai.py +++ b/crawler/parser_ai.py @@ -5,7 +5,19 @@ AI 新闻分类与严重度判定 设置 PARSER_AI_DISABLED=1 可只用规则(更快) """ import os -from typing import Literal, Optional, Tuple +from typing import Any, Optional, Tuple + +try: + from typing import Literal # type: ignore +except ImportError: + try: + from typing_extensions import Literal # type: ignore + except ImportError: + class _LiteralFallback: + def __getitem__(self, item): + return Any + + Literal = _LiteralFallback() Category = Literal["deployment", "alert", "intel", "diplomatic", "other"] Severity = Literal["low", "medium", "high", "critical"] diff --git a/crawler/run_uvicorn.sh b/crawler/run_uvicorn.sh new file mode 100644 index 0000000..2a53f6b --- /dev/null +++ b/crawler/run_uvicorn.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash +# PM2 用:在 crawler 目录下启动 uvicorn(GDELT/RSS 实时服务 :8000) +set -e +cd "$(dirname "$0")" +[ -n "$LANG" ] || export LANG="${LANG:-en_US.UTF-8}" +[ -n "$LC_ALL" ] || export LC_ALL="${LC_ALL:-en_US.UTF-8}" +# 若项目根目录有 .env,可在此加载(PM2 一般已在 ecosystem 里配 env) +if [ -f ../.env ]; then set -a; . ../.env; set +a; fi +exec python3 -m uvicorn realtime_conflict_service:app --host 0.0.0.0 --port 8000 diff --git a/crawler/scrapers/rss_scraper.py b/crawler/scrapers/rss_scraper.py index 0bf1d45..e3d1055 100644 --- a/crawler/scrapers/rss_scraper.py +++ b/crawler/scrapers/rss_scraper.py @@ -3,6 +3,7 @@ import re import socket from datetime import datetime, timezone +from typing import List, Set, Tuple import feedparser @@ -33,7 +34,7 @@ def _matches_keywords(text: str) -> bool: return False -def _fetch_one_feed(name: str, url: str, timeout: int) -> list[dict]: +def _fetch_one_feed(name: str, url: str, timeout: int) -> List[dict]: """抓取单个 RSS 源,超时或异常返回空列表。不负责去重。""" old_timeout = socket.getdefaulttimeout() socket.setdefaulttimeout(timeout) @@ -72,14 +73,14 @@ def _fetch_one_feed(name: str, url: str, timeout: int) -> list[dict]: return out -def fetch_all() -> list[dict]: +def fetch_all() -> List[dict]: """抓取所有配置的 RSS 源,按源超时与隔离错误,全局去重后返回。""" sources = get_feed_sources() if not sources: return [] - items: list[dict] = [] - seen: set[tuple[str, str]] = set() + items: List[dict] = [] + seen: Set[Tuple[str, str]] = set() for name, url in sources: batch = _fetch_one_feed(name, url, FEED_TIMEOUT) diff --git a/ecosystem.config.cjs b/ecosystem.config.cjs new file mode 100644 index 0000000..7e5a0fa --- /dev/null +++ b/ecosystem.config.cjs @@ -0,0 +1,43 @@ +/** + * PM2 进程配置:API + 爬虫(GDELT/RSS uvicorn 服务) + * 用法: + * pm2 start ecosystem.config.cjs # 启动全部 + * pm2 restart ecosystem.config.cjs # 重启全部 + * pm2 stop ecosystem.config.cjs # 停止全部 + * pm2 logs nsa_api / pm2 logs nsa_crawler + * 需 .env 时可在启动前 source .env,或在应用内用 dotenv 加载。 + */ +module.exports = { + apps: [ + { + name: 'nsa_api', + script: 'server/index.js', + cwd: __dirname, + interpreter: 'node', + instances: 1, + autorestart: true, + watch: false, + max_memory_restart: '300M', + env: { + NODE_ENV: 'production', + API_PORT: 3001, + }, + }, + { + name: 'nsa_crawler', + script: 'crawler/run_uvicorn.sh', + cwd: __dirname, + interpreter: 'bash', + instances: 1, + autorestart: true, + watch: false, + max_memory_restart: '300M', + env: { + CLEANER_AI_DISABLED: '1', + PARSER_AI_DISABLED: '0', + GDELT_DISABLED: '1', + RSS_INTERVAL_SEC: '60', + }, + }, + ], +};