From fa6f7407f0c20fb38dea050003f320979cb9291a Mon Sep 17 00:00:00 2001 From: Daniel Date: Tue, 3 Mar 2026 13:02:28 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BC=98=E5=8C=96=E5=90=8E=E7=AB=AF?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E6=9B=B4=E6=96=B0=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- crawler/README.md | 156 ++++++++++++++++++ crawler/__pycache__/config.cpython-311.pyc | Bin 2725 -> 4083 bytes crawler/__pycache__/config.cpython-39.pyc | Bin 2386 -> 3067 bytes crawler/__pycache__/pipeline.cpython-311.pyc | Bin 0 -> 7725 bytes crawler/__pycache__/pipeline.cpython-39.pyc | Bin 0 -> 4743 bytes .../realtime_conflict_service.cpython-39.pyc | Bin 15244 -> 13641 bytes crawler/article_fetcher.py | 90 ++++++++++ crawler/config.py | 23 +++ crawler/db_merge.py | 72 +++++--- crawler/extractor_ai.py | 39 +++-- crawler/extractor_dashscope.py | 35 ++-- crawler/extractor_rules.py | 8 +- crawler/main.py | 38 ++--- crawler/panel_schema.py | 2 +- crawler/pipeline.py | 150 +++++++++++++++++ crawler/realtime_conflict_service.py | 83 ++-------- crawler/requirements.txt | 1 + .../__pycache__/rss_scraper.cpython-311.pyc | Bin 3677 -> 4945 bytes .../__pycache__/rss_scraper.cpython-39.pyc | Bin 2018 -> 2839 bytes crawler/scrapers/rss_scraper.py | 96 ++++++----- 20 files changed, 592 insertions(+), 201 deletions(-) create mode 100644 crawler/__pycache__/pipeline.cpython-311.pyc create mode 100644 crawler/__pycache__/pipeline.cpython-39.pyc create mode 100644 crawler/article_fetcher.py create mode 100644 crawler/pipeline.py diff --git a/crawler/README.md b/crawler/README.md index 0280af6..4a4991d 100644 --- a/crawler/README.md +++ b/crawler/README.md @@ -54,10 +54,20 @@ pip install -r requirements.txt **事件脉络不更新时**:多半是未启动 `npm run gdelt`。只跑 `npm run api` 时,事件脉络会显示空或仅有缓存。 +## 写库流水线(与 server/README 第五节一致) + +RSS 与主入口均走统一流水线 `pipeline.run_full_pipeline`: + +1. **抓取** → 2. **AI 清洗**(标题/摘要/分类)→ 3. **去重**(news_content.content_hash)→ 4. **映射到前端库字段**(situation_update、combat_losses、key_location 等)→ 5. **更新表** → 6. **有新增时 POST /api/crawler/notify** + +- `npm run crawler`(main.py)与 `npm run gdelt`(realtime_conflict_service)的 RSS 分支都调用该流水线。 +- 实现见 `crawler/pipeline.py`。 + ## 数据流 ``` GDELT API → 抓取(60s) → SQLite (gdelt_events, conflict_stats) → POST /api/crawler/notify +RSS → 抓取 → 清洗 → 去重 → 写 news_content / situation_update / 战损等 → POST /api/crawler/notify ↓ Node 更新 situation.updated_at + WebSocket 广播 ↓ @@ -79,6 +89,83 @@ GDELT API → 抓取(60s) → SQLite (gdelt_events, conflict_stats) → POST /ap - `OLLAMA_MODEL`: AI 分类模型,默认 `llama3.1` - `PARSER_AI_DISABLED`: 设为 `1` 则禁用 AI 分类,仅用规则 - `CLEANER_AI_DISABLED`: 设为 `1` 则禁用 AI 清洗,仅用规则截断 +- `FETCH_FULL_ARTICLE`: 设为 `0` 则不再抓取正文,仅用标题+摘要做 AI 提取(默认 `1` 抓取正文) +- `ARTICLE_FETCH_LIMIT`: 每轮为多少条新资讯抓取正文,默认 10 +- `ARTICLE_FETCH_TIMEOUT`: 单篇正文请求超时(秒),默认 12 +- `ARTICLE_MAX_BODY_CHARS`: 正文最大字符数,默认 6000 +- `EXTRACT_TEXT_MAX_LEN`: 送入 AI 提取的原文最大长度,默认 4000 + +**增量与地点**:战损一律按**增量**处理——AI 只填本则报道的「本次/此次」新增数,不填累计总数;合并时与库内当前值叠加。双方攻击地点通过 `key_location_updates` 更新(美军基地被打击 side=us,伊朗设施被打击 side=iran),会写入 `key_location` 的 status/damage_level。 + +--- + +## 优化后验证效果示例 + +以下为「正文抓取 + AI 精确提取 + 增量与地点更新」优化后,单条新闻从输入到前端展示的完整示例,便于对照验证。 + +### 1. 示例输入(新闻摘要/全文片段) + +``` +伊朗向伊拉克阿萨德空军基地发射 12 枚弹道导弹,造成此次袭击中 2 名美军人员死亡、14 人受伤, +另有 1 架战机在跑道受损。乌代德基地未遭直接命中。同日以色列对伊朗伊斯法罕一处设施发动打击。 +``` + +### 2. AI 提取输出(增量 + 攻击地点) + +```json +{ + "summary": "伊朗导弹袭击伊拉克阿萨德基地致美军 2 死 14 伤,1 架战机受损;以军打击伊斯法罕。", + "category": "alert", + "severity": "high", + "us_personnel_killed": 2, + "us_personnel_wounded": 14, + "us_aircraft": 1, + "us_bases_damaged": 1, + "key_location_updates": [ + { "name_keywords": "阿萨德|asad|al-asad", "side": "us", "status": "attacked", "damage_level": 2 }, + { "name_keywords": "伊斯法罕|isfahan", "side": "iran", "status": "attacked", "damage_level": 1 } + ] +} +``` + +说明:战损为**本则报道的新增数**(此次 2 死、14 伤、1 架战机),不是累计总数;地点为双方遭袭设施(美军基地 side=us,伊朗设施 side=iran)。 + +### 3. 合并后数据库变化 + +| 表/字段 | 合并前 | 本则增量 | 合并后 | +|--------|--------|----------|--------| +| combat_losses.us.personnel_killed | 127 | +2 | 129 | +| combat_losses.us.personnel_wounded | 384 | +14 | 398 | +| combat_losses.us.aircraft | 2 | +1 | 3 | +| combat_losses.us.bases_damaged | 27 | +1 | 28 | +| key_location(name 含「阿萨德」) | status=operational | — | status=attacked, damage_level=2 | +| key_location(name 含「伊斯法罕」) | status=operational | — | status=attacked, damage_level=1 | + +若 AI 误提「累计 2847 人丧生」并填成 personnel_killed=2847,单次合并会被上限截断(如最多 +500),避免一次写入导致数据剧增。 + +### 4. 前端验证效果 + +- **事件脉络**:出现一条新条目,summary 为上述 1–2 句概括,category=alert、severity=high。 +- **装备毁伤面板**:美军「阵亡」+2、「受伤」+14、「战机」+1;基地毁/损数字随 bases_damaged +1 更新。 +- **地图**:阿萨德基地、伊斯法罕对应点位显示为「遭袭」状态(脉冲/标色随现有地图逻辑)。 +- **API**:`GET /api/situation` 中 `usForces.combatLosses`、`usForces.keyLocations`(含 status/damage_level)为更新后值;`lastUpdated` 为合并后时间。 + +### 5. 快速自测命令 + +```bash +# 仅测提取逻辑(不写库):用示例文本调 AI 提取,看是否得到增量 + key_location_updates +cd crawler && python3 -c " +from extractor_ai import extract_from_news +text = '''伊朗向伊拉克阿萨德空军基地发射导弹,此次袭击造成 2 名美军死亡、14 人受伤,1 架战机受损。''' +out = extract_from_news(text) +print('combat_losses_delta:', out.get('combat_losses_delta')) +print('key_location_updates:', out.get('key_location_updates')) +" +``` + +期望:`combat_losses_delta.us` 含 personnel_killed=2、personnel_wounded=14、aircraft=1 等增量;`key_location_updates` 含阿萨德 side=us 等条目。 + +--- ## 冲突强度 (impact_score) @@ -93,6 +180,75 @@ GDELT API → 抓取(60s) → SQLite (gdelt_events, conflict_stats) → POST /ap - `GET http://localhost:8000/events`:返回事件列表与冲突统计(Python 服务直连) - `GET http://localhost:3001/api/events`:从 Node 读取(推荐,含 WebSocket 同步) +## 本地验证链路 + +按下面任选一种方式,确认「抓取 → 清洗 → 去重 → 映射 → 写表 → 通知」整条链路正常。 + +### 方式一:最小验证(不启动前端) + +1. **启动 API(必须)** + ```bash + npm run api + ``` + 保持运行,默认 `http://localhost:3001`。 + +2. **安装爬虫依赖并跑一轮流水线** + ```bash + cd crawler && pip install -r requirements.txt + python -c " + from pipeline import run_full_pipeline + from config import DB_PATH, API_BASE + n_fetched, n_news, n_panel = run_full_pipeline(db_path=DB_PATH, api_base=API_BASE, translate=True, notify=True) + print('抓取:', n_fetched, '去重新增:', n_news, '面板写入:', n_panel) + " + ``` + - 有网络且有关键词命中时,应看到非零数字;无网络或全被过滤则为 `0 0 0`。 + - 若报错 `module 'socket' has no attribute 'settimeout'`,已修复为 `setdefaulttimeout`,请拉取最新代码。 + +3. **查库确认** + ```bash + sqlite3 server/data.db "SELECT COUNT(*) FROM situation_update; SELECT COUNT(*) FROM news_content;" + ``` + 或浏览器打开 `http://localhost:3001/api/db/dashboard`,看 `situation_update`、`news_content` 是否有数据。 + +4. **确认态势接口** + ```bash + curl -s http://localhost:3001/api/situation | head -c 500 + ``` + 应包含 `lastUpdated`、`recentUpdates` 等。 + +### 方式二:用现有验证脚本(推荐) + +1. 终端 1:`npm run api` +2. 终端 2(可选):`npm run gdelt`(会定时跑 RSS + GDELT) +3. 执行验证脚本: + ```bash + ./scripts/verify-pipeline.sh + ``` + 若爬虫未启动想一并测爬虫,可: + ```bash + ./scripts/verify-pipeline.sh --start-crawler + ``` + 脚本会检查:API 健康、态势数据、爬虫状态、资讯表、战损字段、通知接口。 + +### 方式三:只测 RSS 抓取(不写库) + +```bash +npm run crawler:test +``` +输出为「RSS 抓取: N 条」。0 条时检查网络或 `config.py` 里 `RSS_FEEDS` / `KEYWORDS`。 + +### 常见问题 + +| 现象 | 可能原因 | +|------|----------| +| 抓取 0 条 | 网络不通、RSS 被墙、关键词无一命中 | +| `situation_update` 为空 | 去重后无新增,或未跑流水线(只跑了 `fetch_all` 未跑 `run_full_pipeline`) | +| 前端事件脉络不刷新 | 未启动 `npm run api` 或 WebSocket 未连上(需通过 Vite 代理访问前端) | +| 翻译/AI 清洗很慢或报错 | 设 `TRANSLATE_DISABLED=1` 或 `CLEANER_AI_DISABLED=1` 可跳过,用规则兜底 | + +--- + ## 故障排查 | 现象 | 可能原因 | 排查 | diff --git a/crawler/__pycache__/config.cpython-311.pyc b/crawler/__pycache__/config.cpython-311.pyc index a9256d8258b2f5333d99d4e3f4680c32e890fd64..f1aa1f47046b9201450b593f74c9a9b1e2760332 100644 GIT binary patch delta 1459 zcmaJ>-A~(A6u-7_LP8u8Y(838iy=V}UOym~OkZ(29q!?t>&}6ZVVOL4e*I5korq4+c=mhuAMf3Nw)gqU z?&gE(nu$;-D7D*c<2;Y`k%Y;R8i4s1Woq%AFd2vQKd_YmQ71PkNFW%Yjvng(QXzGIr{)Kn=m_+;g-M*E zj;nC>{G`DcfnjhP)8-{!fWsCLh9^mHYmO6+=%H91CgNJ090g%249Cfn1q67w7AEi} z7EE2irN3`2?X7&z-Kak#u_alo+R_B>z zDlmm>#j!iNDdgTRsiOGpoFaz}wh@UJB^w&^3%t+f6Fw6BUfYPP+vdSFK3=qW0{(Hq zYYl!Z8}SU>%JFkcfhp0$OJWt~c?+wwg<%F{V0q$rqzma8`%A;AEYPUTIhdth>PqK4 z-?b!lQ7)z9vOuNnO0#9LwuQFk*7?gzmlJGboS${1nbKI@Lfvw0oJlg~6l30D8n&5+ zB-4~)nr7{@_6%D#_x5tl4rAJ8Oi89T#njH))9mS`&K4~htpZtx1A&2WHAaY9dEQUi(=VSm) z`S>o;i(TaxCAo;j7uqk&O3RmeWBm*L2~+C^nJno@m2@OP@#1@h<|r_-v(nu8rS@3Y zLRZ}S%$#J~Qfyn&a4}`LI74S?plVLT&I~-4hL_T?F#~He(2{}GX?P|B8`IF7(N`wS zR}-)YkF;XuC)W=nvA3e0GfE4Q5g0JfB+ A^8f$< delta 289 zcmew?zf@FxIWI340}w$zp2D$JTpMGLNfqrs+URq|lUP0wA4x8Nkl+v73 wyP{yAG$RlfUtpL#ldpHOJ-?!+I4{!&1{l%6`N2ei2Pgm|8aP0(C=jR#0OWj2k^lez diff --git a/crawler/__pycache__/config.cpython-39.pyc b/crawler/__pycache__/config.cpython-39.pyc index d85e50a6fd36a0aac1538c5dbd1eccc19334b6e1..212789980fa5bf1a0b887e1ff8c5a5c9e5e24e10 100644 GIT binary patch delta 1154 zcmaJ7%Q6yBL#uh*%QZrwJ8vTuvZ}Te2r2|elr|Nz5g>X)Du>Dn%8ff5N8*GXH{!s78(es^mQYU2YQEX`GvAx{ z&HS49=fE**cz8&GVHUo#7fnU^T^c(F_>`Am@}F0a@ptG5MHh@#j0}^JHolIECfQ9) zvWFzdUb2tuC!^#5d4fDi#>hc(=sqUnWFlOjB1!V}f?3={4wGkYD8?W`;x zUJL@CpUhp=W{x-Y*yO7wP?zM5m$+ADWy+t2Fdqd_uz@sWpdmDZ5;%?$ zV3~-GLnO5VCS0n)GkqQa+cP6;bM5wwdR$gsiJVT_%~Ce6>su zeYP7gLz=R0wp)VKOBiot-+eompDBdu?|?&L^u>xts;@Zo6b6&O03d~rt0o##N8n4S W2^}MhVM>%^V8Im`1FNr%8|J^m9$zj1 delta 456 zcmew@eo06>k(ZZ?0SFp`mS(=^VPJR+;vfSyAjbiSi(4jY7mKAcr0_0cj1mCSd_Y<- zg(;XplYinj4aTU+@{G|;ntYpk7&kLc-oaccq*)ckl3A3P7sU=DGl7iBvTTk(h4E|w zKym}yYf-R*!ctD4`A_zAJ?rj#+S(12VRsgbVlA#L%1i`HJ?)?Qv}5IyU9+Mlb8;90 zwOVud0?AGe86dfgqYp@$a$Ypx0GblTR$Nk)nVlNNmRM4fn4BHOk(-&5S&~>(8O5HP zSzMf%lUm9QRQGJbj;P6IT=4=?Y$Zkc`31#MtYA87@=~sRMxn`~+^Y3a>_w@?`8j2& zx7Z32i&FDSqId#={JmVAL*j${{X=fCr{V*)BF76KA1j2w(S%mR#3%zQv7#LOWD0KRmCP5=M^ diff --git a/crawler/__pycache__/pipeline.cpython-311.pyc b/crawler/__pycache__/pipeline.cpython-311.pyc new file mode 100644 index 0000000000000000000000000000000000000000..d06eb0876b064b2ecaa951b511378a6c5eb10c79 GIT binary patch literal 7725 zcmbtZYj6`sww}>*^tR=fY;2HiY@z^`4IvQoxUsPfghvu=5^w-r;Tao&UUz1MF)Qsl zxsb@j1c#7V;SoftAl^X0xtHuZkQ>O>t%TY?J3~!XR8vJ2wL!A|=SHdQ4{mf)GGn;twfdEt`w;lw*uhN2Hqw0(PfOWW4=1`lbSyn6NF&C^rI-}qs0 zcrtu->ccA!zy2hp&h&d`X;Jx-Um!9QR{+-S<_W?df2 z(c|;>x?Mf21MzqDP}cad&nHih{V@23b{ZLTn@uJXq?4skZ;?Rx5ucZ`$wgwl)9rTd za|887KkM>&oo-Rt74W+$7@}@#b-dKlwNWHmUfSenZRuJKbGQeGX`pa!_@e{!?Iep`AEtNOJbe+R+S`7??MI62uvGRQ+S%`h9q0A2uHFGr@zRT(T}SlmeO{LGvP-)L{M3;Gr{52o z$B9$)7~qeY58G`JODc5Ak{D6NWm3#oJna46 zxbU8FVa!x^@^{C77h!l)tzfF<^s7=SR#YAc8XYRpvk6Q(d}|Aw-#JgLQi z!`|-=wf79Q(e>knoS~LCEEf#RlNhe4A7^7lWv5q1Yws1+CFMYnsJZMX)ah?1%h8@x zdi3bglgK&Rqqwd(iOG~Dx$Hh+;Eau9FGB{2Cz=GJi6fd~rs8lQ^8>z*l1dl`Z)OeH z>5Z0>mNnQnWx5u-{F@p*D%%w;%hlg3m!tBtI4ZA{%4@W(wemly%&qg~f10m^vZ!>i zl!p=3GN2; z5Y$DBGqss-_UT;hKxw8tnhzM{hlx_$+a91fbgFt@jci9gPovka&MQapGG1 z%Rj_N!YB`qTYG$-eNNWl_Av~_Snbw>)Bsdc9?v@Y*$@CVfJ#%hPsh)Gl!#o9k6cci z8%jtR4(J_{PX*_v^iXskB^I`|bZ+ci|KdyS4g`xfx9<{>Yb>M=v$PX1STF7KIJ{Ip zGcEb6qV-$aTeh`tceHGBv~B8aY2DJ^CgP1E-ZYJ%={8Zxx>z^$3vLzFOu*xD(gV|I zABY-%V4vH??59W(?>?eh+}POM*m$J0Zr9Q}&(b=wt8Qa+-PY#1&W6Uu#%@qRgM2f1 zPmlliI9&@Ix&n(J&-!RE+w}R56X$QIER-^vJ!Pe4tF1zm`4~}w&SFtT9dDl{e2kB(?Gd8H>jQ?7*$xG6@| z=Lk-hD0hR?*SZ*&7gYE5P@)_d(&+FP4akx%#@FkkJx&Qc_}xzEp<<7?yuChA>tr%W zmPSZVln2-zk$~~Nh)!1kjYda|%}68kBpD?&6GMQZW-F?6yF4yd)JhX39Zv6nD1*9! z1${+1EQTlplc=EJAX!n9o&`nH3xGFcgbd-2+(TP|B$^%Rj=+#i8?zG{(9y%7oi&I( z&=r7hR+gNXol{>ho-s!Id5c}J*d?Omfs?X@2#M5i`RTr;^8EI5-52(r*~=~Ympf*@ zs!gbB`@ZUhdsQ#+Ra=CrExcu`VA&cr#B`PcIAn#77U zH9w&&+zK5=J5F^(Rz_R-;zdI7BCdGp*s}2=zUY|=*=^NL)x^Nv8h+JQVbxY{RVVjy zCtuVRZo6+Xk32Kle5(1~HK*6`CaYkwMwi_))p4e}Sb-&GE*`BvRUetho9hH~U3h)0 zummKT%t=gZC{M~Xh2{6ls-v}+7`|+=P_{UU$xG_)SKEZ@l@s#Yx|_N?>+f3lRojJC z+xhBFp}G?kS3Rg+7*WN_s!oSs8A@94SoNastC!uYUiQVR8*BLL=Y{I$p}l;;g#~98 zaJCk{yj3V~9f9AY`(;)bQdpkLV#St`Hb8WR<$#O2rxEDE1H{q?m)k4e^JNMk-73hmaOvj?SYC>2VBP zl{z1Yg$zLhz(H_OK^c7%_^RD{k0E5lv7n(1+jAs@+?z3IilXoNv4?yNbO8X;%yO^= zf0u)HHf-~k{NvJQNh$8l7y5DNV|YRzE$TD;8!sizeV;UU z?LV#!n)|QFp39E^FDv|=x%5ag`B!Nsx;bZJBu)AD%C{?6&zQEJeoh_*WZ|#yC_p^s z9BWm~Wr5wUptZn3k^gl7T~5yn7L)ivY12#)98~4&4;Lx~Inqziklz|C z3KsXuA?j8V_zI@9 z#v!1F;%6ux0zgrEJN5&FNe_N_>li|s7r&gkeKvmVQhel{$y=XE3Ov(bMKCeLIWb2- z5M(eeHXwyp905Oy!u>G#`b@D<%}1yJNkO_KWCaM3fM^=l-h2W^QYoqGFH7Nf?) zAErW#NVzN5wbH=s>D+8hLE8BBFY{~^fBnql?ORjV-vtGy-u@g61z0*GHg%w9hn^eD z#zqt8uEfW`P6wh;G|BpeIFqD7))_Qgk+vsCAbbfy%=o#nw6;ShQfxp06xr+}DK`ry zJrO^C5tOrTXlvinWd%qczj!5W5s7aKr#D-lN}Rho^^!^7S$S8ltnX9tf`a0yM(dG{ev<5ECY(UWtr;C0K2=s20?LkeZ+?)ce%gxYxQ4>1qXat3&xEtL9C0j`f zqoa${EC@lb1RC5xAoMJ$B^(6^g(GU$pKpoN*OVpua`i>=PUx#pNe}=SfVV*@>refXkL8oMp+__Uo^F`pWo*iS7K-=Y^%ub4xq!wDOh> zf@K4M)QSeKqG61_e(2Lf<1V;v*xLkq8)tv*uIw+WKdbKcb342FjjstCU*juw3l+P= zo6{o(?$mGrIm>F^vRZ&#Q8KtGdxUx7#6oxfJx?x>d z7gHPHKB24_Ss8ix)asGd(fYBbkDrb{eV-^AY2%4v1^fXMrYN)Kiyl6+7UENIQ)p@iYpe(g^5_D)6bjuD;wc zL$60|%_u`bHBkXE4puq`L^*`{r7!9C7>%w3(%1G2vBu*gA#$*e{teI|GLJF4A&2`E zjwcmzTmj1i8G+&2K@IxFutnUHQVd(d&7B@#3%Mtyq{@JoL~4^5yrK)G*X+#t6!ajI z7`(HQlvcn3BlNzb37Bdtkby87}6<8uf)Ew=KS_I3}f@yJ@sS#7@!pleGBc0*3g0ehm P$_Yu%5ID0eS^xh4+4Z(T literal 0 HcmV?d00001 diff --git a/crawler/__pycache__/pipeline.cpython-39.pyc b/crawler/__pycache__/pipeline.cpython-39.pyc new file mode 100644 index 0000000000000000000000000000000000000000..ab5dd1d3602066c94e21f3048f24d0fdc5d3ec13 GIT binary patch literal 4743 zcmZ`+ZEO_B8Q!nk`#zt~hiwpTw{2)+CD%5|AGS4oImejxkMeN<<+P z22CBRM-g}g6rB>WjKfR0nvCEGlzf965(b2&9O(TxPM-mBDTiWHGO9!I1O8M!5_-9o zME)yqH&mJZXzupD(mQ8LUmh>t+FzczS^4t*!@Fn8?;bB5JE?s2&Id~Q_QCSaQ><7z zIy--8!qc9fF3lV$jn9-0PgJhlfbOMhr^?rFd6nmGmQT)9i8v?m=xJM}_VqMYN(Vo$oVfg8-`k;im?hWK zBDevh*_gI=15}P?P23{7fpwZ;X#ECgx8!U+Yifq;>&xd13|;)|*Q#4P`!>3P&aK_* z+RolCYDdfIYQJV-S0>ob6En@zA!4m5TnCvJ8%Ww;fMKQ!QrMwBo_H| z9Foop$cJR)*Lf6}5DNlpAvppHqR@nJYjNXMv~^2w-)LlA*0ix{uk0Ji;nBF3%fZrUG`4NSR@Q_N${B1AWs&P2!WzQF znx|1~;axvnm~7sj|0@mLaz**&ZjG34DV}-e(}1QM7!`?Xs2f&2zg37-6uMi@ zAUHwb1H8<~c!8JyBl{A3E0kkAQJ@a?qffVS#&Z3rU(IRu5TSc1Er59Zi$P&wgZ&oP z!{JEW=7#xc*gi$xN+Q0Oo8(8|0E7??F(VZOp0j<>LZ0>`3Hge`bfCzM@pN;MtfzTwQA!K*6GbLOTRvvo_&cn=rJAGfiZ%7vc-D!@+j*9R%cA&%O2cg5UAO zc+n!C`!g%W=*YPl*zk2t9Mi;^=1<7i#1#T|)CqWgq52&h`Nz%^tWBcG7lJ5K2syz? zo@^Z5V8@)GT{kAcSImx2*E_-K1ZqIZq5v(!rsM<*VJAG?0QKKYB zCveQ7?3gV@OP%Ob$1GBa?v2uQ0MDVsj#&hb*|NL^tJm4B%*`GJe4IZuTRM2TeDY9v z{A79h(EP`zUnswSx_W8<-0f@SW0QctmEYZ~6t7oikC!hTC>=Rj{&1!|aUFoLG&x?n z^96vflFnxOHCr{ZmW3^)P1%7*;KMs^O6A&Nz+gbZ4ea2;3G-zz8N{jj^_{&Nd)IB* z+NDxtd$nu3OV3|D-eVIDV03_FGpdPqTk~|Qxeeiqm=xuFpZ%=_QL5c-%S~I!NctX*sFaLdz3zz^=oIn3@ z`TTt^C|lt$f82iUC{uL3F($@g6n1)yCN7Q^!&O} z8dTlN4rED2V_=mtG-x5)qq;eebwirAb`bM++6_Se0jehyJqQhVON@M*mT6Gf zOj6Ojg`aiQXXqK-PKDTnk*b*^uAtklL>DO~!eY3BZKY%=sK|DMwOL?9zCq1Wxbwuf zpyVdhT0Pi7UJPk7>t`T1Af5M1PFg%BWKI1?^JctEf* zIR8%6`UR4EIY%IW0ZV=(l8F{#P#Xmde4|k3#GJb67-CqO+lzvLy78JWMDaCucCjAn z>Yb1iM`0A15S%&`Si`}0eeMg;MQggy8gA#Xxry1|0QN(R{d4~W+VFH=IyM_;lFvzt zEuXUXH)d-s*b1{2F;rIs!T{sPVEhQv#Zi4t7Y7}+3b+h5x1t7UmskV&W@n=lbfS(A zB`1V8g+ijx;3NQp=SE+FxGXX1GVwwZ@=(2<7;b?6ji`xQEG%&nsM%Rk%NjB8BI(po zx&jubYLq4kNc|O{Z6l!VGJ6T2ZR1mD+gxbe+h~RWqv1`lmW})`K=Z^HZwio`nm{8E z*=}?aj&BU|p(DX}!jZ;+I5{GeS36CJ-vL_dMW?|lk=1sSBU4}OW+w^}X>ghbM26yL zU?YVT_>~WTS}h(aT|ZL(@aB`G6$mt|U)8F3q}HR;rRx{Ur@rt2i`Bo>ssN681pDFL zaW4t9c}bwHH2zNI^Vx@Y4>J-zb-4WD_!C`UZdE)s4*e)nQw~g77i0wXR)+w%YMtk1 z57Kl~yi>h@u5|EXY4T`g_8N0wb*oZ-!gBd*NIi^U>2{uxI5nT6O!~pT{Z9@HpHHVE z$UQJR8{`Kt2)ht??!#39veLu5=c?~rq=U1n()bzgnn0bqawEh#F2F#CQa*j7G zpiYNEy*)MwR}h+ir!LpLJv`x=0SD;qWDsG)hLBE_-YtT2%1_sKZR%4>=Ps0rS85Sq zcB}VKfU-k*zC3lc`p4_u_9&k(LWEwdtt$iq7OnL0h3c&X)$2FB8n~w&yr?|87K+!i zg_(Yw>VpH=P7kSYy@6~R-A#a4)|So!MaoD1|UoK667eLpC&dS z-3k#>(Sv5Tbgcx}93xLDqIVx6ynBNKl z+WIrv9u>0Y{2usr0}JE$vi1-rslelL=%oSa`3z9_Y@VcXN+gy=R(VF2V3X)DSxTPO z-MT%b_Gi&Zs-bgRUpH_UbwgL*x{d0l?#adjf zp_T+dq>>g~aUl0wtuUHe`yf5EDhOPg;I8F_qsbLxf>EgC(pF~A3aMbIZH%9Krrlw}0?B)=T^ zLbDKn_w(#K$v2B}XaoF0NI)HZo)Z(mD?*@^`DI{<+1ky3vb`s-1nA0 z4!9_e6H`dFH`-C6k+d;wGg_OC8JjrC^i$g}oqp2tI03-oGq##Z~m<2D^Hes^j)lwMTZ*K zDXfMjniT4xUGEY~DLd&?o7fXqcGLcOLI>#JNoAU-dqCo)NstWDVJUfDN(_*ss3|3T z*(T}3qG|Ba4ElZa4bXG?hSax6i4Br%4z;DIm9^2#u;+}IpQMC(jCQr1wGZuQPtXxM zO2=ls1On?)V&l1Ac^CXd71zT@NL?-%x#K1EVkz25_T+XJoh7fuh+FD#-LwteVVYxU ziNU1FY=-iL!E{qI*{H4cTU<-)CNq})LKJ1`JftQ@)L=><(+p<1-V8U?0^3R?bt9S5 z2N-Vx3*IO;melOezrxrpOs3L$!qM!6#TjoyC#!iJ9%}JQ3>hbB*t{8~5g7sFnx5qQ z#D$WlNQ3yiv^+yst zh&Jm1KPEgbF0Z*w;-a^7H#sCOl)kMqG0VLv2&-_#=XQ5UzBYl zqq#qqm3wLrprd7P`6%StkSs_8;srz{qD?f1%f!p&wd4gcRbEPth;!vNq*wf+{L+T) zXmAt}TgE1Qn*`j!mtJwa;yyVlj(uI#Mp2MKU=m%gok{VHIJ6KxLrWUK?0SHYq0&M0i2BMcWJs{eZvt;t{*CmBYwIhX#+lMb%;OTydvN$j zw8P?0>(`OtT(D|{ka2PBsbhf_G}|vc)z4d)!bza}tBPVenHZ z+k>b>3?eEJLx^F-^7q7|GIH0)tx-GwC2xTNlxLE<9>;dQ3EgqUjq0KjxhHaqzKt$= z#BZt%a#l2M*p&5yAOe3FvHd&1(G&8FDJe3kz`61|9_njSCKSi(_^6)-XmHl|s&|r1 zdM3RS1hgU0df*vuQXD^TcLLKe&j}(Y2qnWlKIMdH#0g)C%qv$4Xu&+0Q)uA?GMb(6 zz@uow3oO1Ao+d;&teBlX#VH*BEm0kW-JaXu z+Sbusb@%G=yYKvH@%x_wt6u5qs#=)4yfFV!)c|FNqp=ZYI`+NWGYh9?m!>Y>eeZ*+ zPK&bNPo3a+0HBww;u%xx&lrZ5PNtawEvA#d1FG)!n9mK;K4>n^pc>NCN$neby3O41 zazk(frsX92$NBfbo(sfl2u)NxV`Yof&_j>>9EjaWM{`%V*520Lav)z>MQFSlaTXyf zMwYNVA1Rc{qamy4MHt5X(vup7h_MP8RRj4ps0Ph^7hEoS5!?|KzuZ{7`5Bm~cs!(# zgox+i@BP9+2*y4VfZq_=N{T?wFXn!}IZCQ7;>tF_Eo#xt1Tzdq+c>xQgL)Q=TdGNq z2-j{Pd&QR8a`JxeVC}!ibC|Y=Y4xWVA7JX5mX5aO&bC*y=7U=6!LH_g9c`_A4cd7C zG10qeorjN$^wz=bKSj_!q1~Mwx{lxm;V5sZaB@Y1hZ{M-GE$% zcE;B7$DCa?1p6l~_z!VnkeMS%ZkZ1{I~#+oOgM0X@QjgVHva&O)WQ`pH242K{ro#% zArHRmw;i5@Ku{nzNN%6prKns@ddv5#88rF{0uMc(MZAspF5(QLz67X-AFRzNocmkb zo+jTCUu|n9*Tnkg{tDgdYujs9uhbf*8`|P|(Y3wr@MFIIk=P$}F3V*3&LxAsfNRIm zkxwR2^+U!iXu6K&=mvL0VZ54f@oc%>OYAgs76|0(G9Fn=F)a{wryd3=B58n_9vtKy&anW|6G$S-Z-GKK88frKo2 zE?0U_oY`6CzXe7=71wq)Za&}!HH}&cP0JVDN9Y_A=tgYjP}Z+!42Qpr(i5VzAy$lD z+@N6%3?$6~-X}U5PCtoN@?S0Bbu{rNO{hk3uc0j4fa(OI0@07^VolRcXg9i(v`iaT zuR)c(cy^-R?_|;j%U?%_P$kcAq{T8#mcTh&UdcnK8b-*LBe!3s5Od}Fx-oaPGI?*J m(KW;kL=J&_p@u9o-&Q#$k>3(u?Ak@%7JuKx$&`!g@eOOrHdTA)~7LQ8;DFr-502Q7uRHg&L%mUa8?P4bd`yRUcm zB~94ZhZBla38XAvT7oamfE1)!f?5zq9qSi3APkJUGmP|)M9LrPIF2~TIG%GiBvL!$ z+j+k|_uO;OJ@@N(&;EMozR{$bNJJ(04ex$3e`?#YWGi{>RL47Vg>_Sz<{pXtxsJ1mWwQ`|DpO>z(cC4P>8lH2uXobd~jUW)@vP zRV|h8cm94o_>uy13iypmbL<}%#FdYax5ZVy7IafxS zoZgmoWQS91Ns~LABP|_7bxyTRU($sJcOhgJp&g|OfU_v5&cW7?$cK);BCC{9aVLV9 zOrKdQ@HJR0A@~t?BlICy2sXk90taxz+|aZGYHrvKxr zxd+8BBixG+M>vY`6$BiQ8}4UTZcx=UK8d>P5w1bd5n2(72nP`s$CF8j$X!1-Mon)d z$G`x#X`Z^WWxBqcYPnAJ;$%v!iCCiB(S;&b^o@)6lE<8mOWG?nphRN8?P$IQt2jl* zZE@nH0x3&y$_Vvoa=}+7LiTI^LZB=QIiLj#p)#ICLW5ek5GnhG90EC74isYLAe1F5 zPQyKtElo(}kR942?SHTwwtaTEnxK&ijn2r$5gMH!<;a3ECUg&a1}H;a)b`sE8mEa# z*$&ePxbxf5;z`gY7j!Yu1w@+~n!+|=@G zmhF(0qz#kuKDit#$L-h@;c1)Lu?eEfl#~-zjUAdGFq&oHA`b6Hc%xOfl|m~bS+!8w zV%1H+yjk@E)3nKIs4kn4s*SXHh8&mZa?cM)Eq1&DuU0z&xMDhjE^d?bU4F@Gnj&Mb z5viOc()g#XW;-c5hH13^*I}ETJT6W9z1DkS!pq6SNj>VlJEcz008y^o!SRe{h#L`G zPJum_Ar&_<%JV>sr6QbuWQteOmWupFXvH^SQ`xOyx>@2(f!)Oh3_iA~ zFxdzLKV$qjS__gw%HfY9+>Njm;M^eqH@w(7VDwmnjAz4(&96d(plOtNj%9t^G=Zi= z^Z-z^HORk)e!h-C5t#GgYNN6|n>KS?Ei!Jd1ACxeQQ(h*Nj3sa#rQa%#&TS6y=fU7 zDf|iavJ(M!Ki`EQNcI~jRRP@SExmnvlr5Y0Y}_iS$2-HwI_?UE9kO@@=^iJm!1w_3 zt`J2KX&pLw3_+Y-K_-H!Ow#~A?y8S}Ne~v<`HWCrqzvTVzI7gz6z9hSKzY~l+x zF|bJqO=77Pl;c{mP*V<&ibNCTpbd;=`;W-Y60p%L$V$+85<4$GPXn~3Q4*Sz(43)x zMyVWzGamLrqqV~lcMn#DP2CI-7qCOrj=P6Cr8fJv5Hl@{8JbTbVEi6%f7 zz-)&TKo{aI=mM4~v%GXQtYz=ERde^=GylZY`~y>S4?J>i`kV99KmOD4({o2==O6y= z{N%~MynNsM=_B(mJUTaZ|9ewEntRFdXa426u2N`uqc0Ab8 z+1cIM$&Wx4bOyOnfti+CDDo#kv!{gF!KziakE&eXzY2!oh3*CI4d#luse9*M{C@YU zOr@1y;HGxNKptWY{M+En^&c<{%?&YqB+m`qOGE;u#q-5%Ju(3h0pyTWH3uR413Eg1 zjcS%^cu8ZX8(&gD&L{ZbR0S-(OlBA*Bm_6n&y7OfG$1c6z*QdNtf;9u<_4|tyxwoP zQPuo-vf$RYa{omri*%DoddC#Q&n8MHJ3qPrlXvduDqL4vFjl;fi02S!;9|8|)~K@lV} zRHkJ=kWri@;R(VU8zcQh!H_>a+xe*kS%nCyAwl0YL~vYQkTg{mGhtlZ?2+kk$CJ2;V`dN01So zL%0;7;>6kqEBtwsXA!=KfZI=`zk4!1{u?N`ks>#8%rs%+2E94nk!iTlj^jVTwzyjS zMTGSTKSaR2#9v1EWctV-R8c8X$CpsTo5`C5u?ui3L9QrV3Lhfhbb18 zM-ar7jcd<;hM+iK>R3&_>O9raOI~;0>-YzxxqrO8qi*Rk#`%JL`YES%?Sb1aaFEW6 zc_HP6Q|`qrF2QeMS2*5`U(8H40Ef?jY~3;}RpZ`w$a`yR$aBs=*LILMoz?4YQZD=@y>yB5Gq@uGF^w99#YZ;g<+Y`!|;3OnjHB z<}iO1>&^gV2!93T^FB6y2fDM)nJY^hevd{0VGEyaFqLjFCk_G^J^kvFjfIm z(Hn7lc^Od>rG7N=<#me>U=cqpcnYBw0S_JTM#v+yB52OLS1+$zgW_iZV7W_0jd@=| z`%pWE+UQ0@*ICZW1MP}0!KE)Ekza|;Bj&V#ZIFQ$re@FLv_62!*Ad=CcpCv%DH}GD ex&qlNM2+*yU8mgDO{SdFUEFtRs?+IM-}WEzE|tOn diff --git a/crawler/article_fetcher.py b/crawler/article_fetcher.py new file mode 100644 index 0000000..44ef3bd --- /dev/null +++ b/crawler/article_fetcher.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- +""" +从文章 URL 抓取正文,供 AI 提取精确数据使用。 +RSS 仅提供标题和短摘要,正文可提供伤亡、番号、地点等具体数字与事实。 +""" +import os +import re +from typing import Optional + +# 单页超时(秒) +FETCH_TIMEOUT = int(os.environ.get("ARTICLE_FETCH_TIMEOUT", "12")) +# 正文最大字符数,避免超长输入 +MAX_BODY_CHARS = int(os.environ.get("ARTICLE_MAX_BODY_CHARS", "6000")) +# 是否启用正文抓取(设为 0 则仅用标题+摘要) +FETCH_FULL_ARTICLE = os.environ.get("FETCH_FULL_ARTICLE", "1") == "1" + + +def _strip_html(html: str) -> str: + """简单去除 HTML 标签与多余空白""" + if not html: + return "" + text = re.sub(r"]*>[\s\S]*?", " ", html, flags=re.I) + text = re.sub(r"]*>[\s\S]*?", " ", text, flags=re.I) + text = re.sub(r"<[^>]+>", " ", text) + text = re.sub(r"\s+", " ", text).strip() + return text + + +def fetch_article_body(url: str, timeout: int = FETCH_TIMEOUT) -> Optional[str]: + """ + 请求文章 URL,提取正文纯文本。失败或非 HTML 返回 None。 + 优先用 BeautifulSoup 取 main/article 或 body,否则退化为正则去标签。 + """ + if not url or not url.strip().startswith("http"): + return None + try: + import requests + headers = {"User-Agent": "US-Iran-Dashboard/1.0 (News Aggregator)"} + # 不跟随代理,避免墙内超时 + proxies = {"http": None, "https": None} if os.environ.get("CRAWLER_USE_PROXY") != "1" else None + r = requests.get(url, headers=headers, timeout=timeout, proxies=proxies) + r.raise_for_status() + ct = (r.headers.get("Content-Type") or "").lower() + if "html" not in ct and "xml" not in ct: + return None + html = r.text + if not html or len(html) < 200: + return None + try: + from bs4 import BeautifulSoup + except ImportError: + return _strip_html(html)[:MAX_BODY_CHARS] + try: + soup = BeautifulSoup(html, "html.parser") + for tag in ("article", "main", "[role='main']", ".article-body", ".post-content", ".entry-content", ".content"): + if tag.startswith((".", "[")): + node = soup.select_one(tag) + else: + node = soup.find(tag) + if node: + body = node.get_text(separator=" ", strip=True) + if len(body) > 300: + return _strip_html(body)[:MAX_BODY_CHARS] + body = soup.body.get_text(separator=" ", strip=True) if soup.body else "" + if len(body) > 300: + return _strip_html(body)[:MAX_BODY_CHARS] + except Exception: + pass + return _strip_html(html)[:MAX_BODY_CHARS] + except Exception: + return None + + +def enrich_item_with_body(item: dict, max_chars: int = MAX_BODY_CHARS) -> None: + """ + 若 item 有 url 且无 full_text,则抓取正文并写入 item["full_text"]。 + 用于 AI 提取时获得更多上下文。原地修改 item。 + """ + if not FETCH_FULL_ARTICLE: + return + url = (item.get("url") or "").strip() + if not url or item.get("full_text"): + return + body = fetch_article_body(url) + if not body: + return + title = (item.get("title") or "").strip() + summary = (item.get("summary") or "").strip() + combined = f"{title}\n{summary}\n{body}" if summary else f"{title}\n{body}" + item["full_text"] = combined[:max_chars] diff --git a/crawler/config.py b/crawler/config.py index f5bc435..ee34a26 100644 --- a/crawler/config.py +++ b/crawler/config.py @@ -16,7 +16,11 @@ DASHSCOPE_API_KEY = os.environ.get("DASHSCOPE_API_KEY", "") # 抓取间隔(秒) CRAWL_INTERVAL = int(os.environ.get("CRAWL_INTERVAL", "300")) +# 单源抓取超时(秒),避免某源卡住拖垮整轮 +FEED_TIMEOUT = int(os.environ.get("FEED_TIMEOUT", "12")) + # RSS 源:世界主流媒体,覆盖美伊/中东多视角 +# 每项为 URL 字符串,或 {"name": "显示名", "url": "..."} 便于日志与排查 RSS_FEEDS = [ # 美国 "https://feeds.reuters.com/reuters/topNews", @@ -35,6 +39,9 @@ RSS_FEEDS = [ # 中国 "https://english.news.cn/rss/world.xml", "https://www.cgtn.com/rss/world", + # 凤凰网(军事 + 国际,中文视角) + {"name": "凤凰军事", "url": "https://feedx.net/rss/ifengmil.xml"}, + {"name": "凤凰国际", "url": "https://feedx.net/rss/ifengworld.xml"}, # 伊朗 "https://www.presstv.ir/rss", # 卡塔尔(中东) @@ -42,6 +49,22 @@ RSS_FEEDS = [ "https://www.aljazeera.com/xml/rss/middleeast.xml", ] + +def get_feed_sources(): + """返回 [(name, url), ...],name 用于日志,缺省为 URL 的 host""" + import urllib.parse + out = [] + for raw in RSS_FEEDS: + if isinstance(raw, dict): + name = raw.get("name") or "rss" + url = raw.get("url", "").strip() + else: + url = (raw or "").strip() + name = urllib.parse.urlparse(url).netloc or "rss" + if url: + out.append((name, url)) + return out + # 关键词过滤:至少匹配一个才会入库(与地图区域对应:伊拉克/叙利亚/海湾/红海/地中海等) KEYWORDS = [ # 伊朗 diff --git a/crawler/db_merge.py b/crawler/db_merge.py index a23326f..380d055 100644 --- a/crawler/db_merge.py +++ b/crawler/db_merge.py @@ -12,6 +12,23 @@ from typing import Any, Dict, Optional PROJECT_ROOT = Path(__file__).resolve().parent.parent DB_PATH = os.environ.get("DB_PATH", str(PROJECT_ROOT / "server" / "data.db")) +# 单次合并时各字段增量的上限,防止误把「累计总数」当增量导致数据剧增(可选,设为 0 表示不设限) +MAX_DELTA_PER_MERGE = { + "personnel_killed": 500, "personnel_wounded": 1000, "civilian_killed": 300, "civilian_wounded": 500, + "bases_destroyed": 5, "bases_damaged": 10, + "aircraft": 50, "warships": 10, "armor": 30, "vehicles": 100, + "drones": 50, "missiles": 200, "helicopters": 20, "submarines": 5, "carriers": 2, + "civilian_ships": 20, "airport_port": 10, +} + + +def _clamp_delta(key: str, value: int) -> int: + """单次增量上限,避免误提「累计」导致波动""" + cap = MAX_DELTA_PER_MERGE.get(key, 0) + if cap <= 0: + return max(0, value) + return max(0, min(value, cap)) + def _ensure_tables(conn: sqlite3.Connection) -> None: """确保所需表存在(与 db.js 一致)""" @@ -41,7 +58,7 @@ def _ensure_tables(conn: sqlite3.Connection) -> None: conn.execute("ALTER TABLE combat_losses ADD COLUMN updated_at TEXT DEFAULT (datetime('now'))") except sqlite3.OperationalError: pass - for col in ("drones", "missiles", "helicopters", "submarines", "tanks", "civilian_ships", "airport_port"): + for col in ("drones", "missiles", "helicopters", "submarines", "tanks", "carriers", "civilian_ships", "airport_port"): try: conn.execute(f"ALTER TABLE combat_losses ADD COLUMN {col} INTEGER NOT NULL DEFAULT 0") except sqlite3.OperationalError: @@ -72,19 +89,19 @@ def merge(extracted: Dict[str, Any], db_path: Optional[str] = None) -> bool: ) if conn.total_changes > 0: updated = True - # combat_losses:增量叠加到当前值,无行则先插入初始行 + # combat_losses:统一按增量处理。AI 输出为本则报道的新增数,此处叠加到库内当前值,避免把「累计总数」当增量导致数据波动。 if "combat_losses_delta" in extracted: for side, delta in extracted["combat_losses_delta"].items(): if side not in ("us", "iran"): continue try: row = conn.execute( - "SELECT personnel_killed,personnel_wounded,civilian_killed,civilian_wounded,bases_destroyed,bases_damaged,aircraft,warships,armor,vehicles,drones,missiles,helicopters,submarines,tanks,civilian_ships,airport_port FROM combat_losses WHERE side = ?", + "SELECT personnel_killed,personnel_wounded,civilian_killed,civilian_wounded,bases_destroyed,bases_damaged,aircraft,warships,armor,vehicles,drones,missiles,helicopters,submarines,tanks,carriers,civilian_ships,airport_port FROM combat_losses WHERE side = ?", (side,), ).fetchone() cur = {"personnel_killed": 0, "personnel_wounded": 0, "civilian_killed": 0, "civilian_wounded": 0, "bases_destroyed": 0, "bases_damaged": 0, "aircraft": 0, "warships": 0, "armor": 0, "vehicles": 0, - "drones": 0, "missiles": 0, "helicopters": 0, "submarines": 0, "tanks": 0, "civilian_ships": 0, "airport_port": 0} + "drones": 0, "missiles": 0, "helicopters": 0, "submarines": 0, "tanks": 0, "carriers": 0, "civilian_ships": 0, "airport_port": 0} if row: cur = { "personnel_killed": row[0], "personnel_wounded": row[1], "civilian_killed": row[2] or 0, @@ -92,38 +109,39 @@ def merge(extracted: Dict[str, Any], db_path: Optional[str] = None) -> bool: "aircraft": row[6], "warships": row[7], "armor": row[8], "vehicles": row[9], "drones": row[10] if len(row) > 10 else 0, "missiles": row[11] if len(row) > 11 else 0, "helicopters": row[12] if len(row) > 12 else 0, "submarines": row[13] if len(row) > 13 else 0, - "tanks": row[14] if len(row) > 14 else 0, "civilian_ships": row[15] if len(row) > 15 else 0, "airport_port": row[16] if len(row) > 16 else 0, + "tanks": row[14] if len(row) > 14 else 0, "carriers": row[15] if len(row) > 15 else (row[14] if len(row) > 14 else 0), + "civilian_ships": row[16] if len(row) > 16 else 0, "airport_port": row[17] if len(row) > 17 else 0, } - pk = max(0, (cur["personnel_killed"] or 0) + delta.get("personnel_killed", 0)) - pw = max(0, (cur["personnel_wounded"] or 0) + delta.get("personnel_wounded", 0)) - ck = max(0, (cur["civilian_killed"] or 0) + delta.get("civilian_killed", 0)) - cw = max(0, (cur["civilian_wounded"] or 0) + delta.get("civilian_wounded", 0)) - bd = max(0, (cur["bases_destroyed"] or 0) + delta.get("bases_destroyed", 0)) - bm = max(0, (cur["bases_damaged"] or 0) + delta.get("bases_damaged", 0)) - ac = max(0, (cur["aircraft"] or 0) + delta.get("aircraft", 0)) - ws = max(0, (cur["warships"] or 0) + delta.get("warships", 0)) - ar = max(0, (cur["armor"] or 0) + delta.get("armor", 0)) - vh = max(0, (cur["vehicles"] or 0) + delta.get("vehicles", 0)) - dr = max(0, (cur["drones"] or 0) + delta.get("drones", 0)) - ms = max(0, (cur["missiles"] or 0) + delta.get("missiles", 0)) - hp = max(0, (cur["helicopters"] or 0) + delta.get("helicopters", 0)) - sb = max(0, (cur["submarines"] or 0) + delta.get("submarines", 0)) - tk = max(0, (cur["tanks"] or 0) + delta.get("tanks", 0)) - cs = max(0, (cur["civilian_ships"] or 0) + delta.get("civilian_ships", 0)) - ap = max(0, (cur["airport_port"] or 0) + delta.get("airport_port", 0)) + pk = max(0, (cur["personnel_killed"] or 0) + _clamp_delta("personnel_killed", delta.get("personnel_killed", 0))) + pw = max(0, (cur["personnel_wounded"] or 0) + _clamp_delta("personnel_wounded", delta.get("personnel_wounded", 0))) + ck = max(0, (cur["civilian_killed"] or 0) + _clamp_delta("civilian_killed", delta.get("civilian_killed", 0))) + cw = max(0, (cur["civilian_wounded"] or 0) + _clamp_delta("civilian_wounded", delta.get("civilian_wounded", 0))) + bd = max(0, (cur["bases_destroyed"] or 0) + _clamp_delta("bases_destroyed", delta.get("bases_destroyed", 0))) + bm = max(0, (cur["bases_damaged"] or 0) + _clamp_delta("bases_damaged", delta.get("bases_damaged", 0))) + ac = max(0, (cur["aircraft"] or 0) + _clamp_delta("aircraft", delta.get("aircraft", 0))) + ws = max(0, (cur["warships"] or 0) + _clamp_delta("warships", delta.get("warships", 0))) + ar = max(0, (cur["armor"] or 0) + _clamp_delta("armor", delta.get("armor", 0))) + vh = max(0, (cur["vehicles"] or 0) + _clamp_delta("vehicles", delta.get("vehicles", 0))) + dr = max(0, (cur["drones"] or 0) + _clamp_delta("drones", delta.get("drones", 0))) + ms = max(0, (cur["missiles"] or 0) + _clamp_delta("missiles", delta.get("missiles", 0))) + hp = max(0, (cur["helicopters"] or 0) + _clamp_delta("helicopters", delta.get("helicopters", 0))) + sb = max(0, (cur["submarines"] or 0) + _clamp_delta("submarines", delta.get("submarines", 0))) + cr = max(0, (cur["carriers"] or 0) + _clamp_delta("carriers", delta.get("carriers", 0))) + cs = max(0, (cur["civilian_ships"] or 0) + _clamp_delta("civilian_ships", delta.get("civilian_ships", 0))) + ap = max(0, (cur["airport_port"] or 0) + _clamp_delta("airport_port", delta.get("airport_port", 0))) ts = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.000Z") if row: conn.execute( """UPDATE combat_losses SET personnel_killed=?, personnel_wounded=?, civilian_killed=?, civilian_wounded=?, bases_destroyed=?, bases_damaged=?, aircraft=?, warships=?, armor=?, vehicles=?, - drones=?, missiles=?, helicopters=?, submarines=?, tanks=?, civilian_ships=?, airport_port=?, updated_at=? WHERE side=?""", - (pk, pw, ck, cw, bd, bm, ac, ws, ar, vh, dr, ms, hp, sb, tk, cs, ap, ts, side), + drones=?, missiles=?, helicopters=?, submarines=?, tanks=?, carriers=?, civilian_ships=?, airport_port=?, updated_at=? WHERE side=?""", + (pk, pw, ck, cw, bd, bm, ac, ws, ar, vh, dr, ms, hp, sb, cur.get("tanks", 0), cr, cs, ap, ts, side), ) else: conn.execute( """INSERT OR REPLACE INTO combat_losses (side, personnel_killed, personnel_wounded, civilian_killed, civilian_wounded, - bases_destroyed, bases_damaged, aircraft, warships, armor, vehicles, drones, missiles, helicopters, submarines, tanks, civilian_ships, airport_port, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", - (side, pk, pw, ck, cw, bd, bm, ac, ws, ar, vh, dr, ms, hp, sb, tk, cs, ap, ts), + bases_destroyed, bases_damaged, aircraft, warships, armor, vehicles, drones, missiles, helicopters, submarines, tanks, carriers, civilian_ships, airport_port, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + (side, pk, pw, ck, cw, bd, bm, ac, ws, ar, vh, dr, ms, hp, sb, 0, cr, cs, ap, ts), ) if conn.total_changes > 0: updated = True @@ -140,7 +158,7 @@ def merge(extracted: Dict[str, Any], db_path: Optional[str] = None) -> bool: w = extracted["wall_street"] conn.execute("INSERT INTO wall_street_trend (time, value) VALUES (?, ?)", (w["time"], w["value"])) updated = True - # key_location:更新受袭基地 status/damage_level + # key_location:更新双方攻击地点(美军基地被打击 side=us,伊朗设施被打击 side=iran)的 status/damage_level if "key_location_updates" in extracted: try: for u in extracted["key_location_updates"]: diff --git a/crawler/extractor_ai.py b/crawler/extractor_ai.py index 05c43a5..8dc443b 100644 --- a/crawler/extractor_ai.py +++ b/crawler/extractor_ai.py @@ -15,31 +15,40 @@ CLEANER_AI_DISABLED = os.environ.get("CLEANER_AI_DISABLED", "0") == "1" OLLAMA_MODEL = os.environ.get("OLLAMA_MODEL", "llama3.1") -def _call_ollama_extract(text: str, timeout: int = 10) -> Optional[Dict[str, Any]]: - """调用 Ollama 提取结构化数据。输出 JSON,仅包含新闻中可明确推断的字段""" +# 用于 AI 提取的原文最大长度(有正文时取更长以提取精确数据) +EXTRACT_TEXT_MAX_LEN = int(os.environ.get("EXTRACT_TEXT_MAX_LEN", "4000")) + + +def _call_ollama_extract(text: str, timeout: int = 15) -> Optional[Dict[str, Any]]: + """调用 Ollama 从新闻全文/摘要中提取精确结构化数据,仅填写报道中明确给出的数字与事实。""" if CLEANER_AI_DISABLED or not text or len(str(text).strip()) < 10: return None try: import requests - prompt = f"""从以下美伊/中东新闻中提取可推断的数值,输出 JSON,仅包含有明确依据的字段。无依据则省略该字段。 + raw = str(text).strip()[:EXTRACT_TEXT_MAX_LEN] + prompt = f"""从以下美伊/中东新闻**全文或摘要**中,提取**报道明确给出的数字与事实**,输出 JSON。规则: +1. 仅填写报道中**直接出现、可核对**的数据,不要推测或估算。 +2. 无明确依据的字段**必须省略**,不要填 0 或猜。 +3. **战损一律按增量**:只填本则报道中「本次/此次/今日/本轮」**新增**的伤亡或损毁数量。若报道只给「累计总数」「迄今共」「total so far」等,**不要填写**该字段(避免与库内已有累计值重复叠加)。 +4. **攻击地点**:提取双方遭袭的具体地点。美军/盟军基地被打击 → side=us;伊朗/亲伊设施被打击 → side=iran。name_keywords 用「中文名|英文名」便于匹配,可填多处。 -要求: -- summary: 1-2句中文事实,≤80字 +字段说明: +- summary: 1-2 句中文事实概括,≤80 字 - category: deployment|alert|intel|diplomatic|other - severity: low|medium|high|critical -- 战损(仅当新闻明确提及数字时填写,格式 us_XXX / iran_XXX): +- 战损(**仅填本则报道的新增增量**,如「此次 5 人丧生」「今日又损 2 架」): us_personnel_killed, iran_personnel_killed, us_personnel_wounded, iran_personnel_wounded, us_civilian_killed, iran_civilian_killed, us_civilian_wounded, iran_civilian_wounded, - us_bases_destroyed, iran_bases_destroyed, us_bases_damaged, iran_bases_damaged. - 重要:bases_* 仅指已确认损毁/受损的基地数量;"军事目标"/targets 等泛指不是基地,若报道只说"X个军事目标遭袭"而无具体基地名,不填写 bases_* + us_bases_destroyed, iran_bases_destroyed, us_bases_damaged, iran_bases_damaged, us_aircraft, iran_aircraft, us_warships, iran_warships, us_armor, iran_armor, us_vehicles, iran_vehicles, us_drones, iran_drones, us_missiles, iran_missiles, us_helicopters, iran_helicopters, us_submarines, iran_submarines, - us_tanks, iran_tanks, us_civilian_ships, iran_civilian_ships, us_airport_port, iran_airport_port -- retaliation_sentiment: 0-100,仅当新闻涉及伊朗报复情绪时 -- wall_street_value: 0-100,仅当新闻涉及美股/市场反应时 -- key_location_updates: 当新闻提及具体基地/地点遭袭时,数组项 { "name_keywords": "asad|阿萨德|assad", "side": "us", "status": "attacked", "damage_level": 1-3 } + us_carriers, iran_carriers, us_civilian_ships, iran_civilian_ships, us_airport_port, iran_airport_port +- retaliation_sentiment: 0-100,仅当报道涉及伊朗报复/反击情绪时 +- wall_street_value: 0-100,仅当报道涉及美股/市场时 +- key_location_updates: **双方攻击地点**。每项 {{ "name_keywords": "阿萨德|asad|al-asad", "side": "us或iran(被打击方)", "status": "attacked", "damage_level": 1-3 }}。美军基地例:阿萨德|asad、乌代德|udeid、埃尔比勒|erbil、因吉尔利克|incirlik。伊朗例:德黑兰|tehran、布什尔|bushehr、伊斯法罕|isfahan、阿巴斯|abbas、纳坦兹|natanz -原文:{str(text)[:800]} +原文: +{raw} 直接输出 JSON,不要解释:""" r = requests.post( @@ -48,7 +57,7 @@ def _call_ollama_extract(text: str, timeout: int = 10) -> Optional[Dict[str, Any "model": OLLAMA_MODEL, "messages": [{"role": "user", "content": prompt}], "stream": False, - "options": {"num_predict": 256}, + "options": {"num_predict": 384}, }, timeout=timeout, ) @@ -82,7 +91,7 @@ def extract_from_news(text: str, timestamp: Optional[str] = None) -> Dict[str, A # combat_losses 增量(仅数字字段) loss_us = {} loss_ir = {} - for k in ["personnel_killed", "personnel_wounded", "civilian_killed", "civilian_wounded", "bases_destroyed", "bases_damaged", "aircraft", "warships", "armor", "vehicles", "drones", "missiles", "helicopters", "submarines", "tanks", "civilian_ships", "airport_port"]: + for k in ["personnel_killed", "personnel_wounded", "civilian_killed", "civilian_wounded", "bases_destroyed", "bases_damaged", "aircraft", "warships", "armor", "vehicles", "drones", "missiles", "helicopters", "submarines", "carriers", "civilian_ships", "airport_port"]: uk = f"us_{k}" ik = f"iran_{k}" if uk in parsed and isinstance(parsed[uk], (int, float)): diff --git a/crawler/extractor_dashscope.py b/crawler/extractor_dashscope.py index 439e9b9..9001ba8 100644 --- a/crawler/extractor_dashscope.py +++ b/crawler/extractor_dashscope.py @@ -13,8 +13,11 @@ from typing import Any, Dict, Optional from panel_schema import validate_category, validate_severity, validate_summary +EXTRACT_TEXT_MAX_LEN = int(os.environ.get("EXTRACT_TEXT_MAX_LEN", "4000")) + + def _call_dashscope_extract(text: str, timeout: int = 15) -> Optional[Dict[str, Any]]: - """调用阿里云 DashScope 提取结构化数据""" + """调用阿里云 DashScope 从新闻全文中提取精确结构化数据,仅填写报道明确给出的数字与事实。""" api_key = os.environ.get("DASHSCOPE_API_KEY", "").strip() if not api_key or not text or len(str(text).strip()) < 10: return None @@ -23,27 +26,25 @@ def _call_dashscope_extract(text: str, timeout: int = 15) -> Optional[Dict[str, from http import HTTPStatus dashscope.api_key = api_key + raw = str(text).strip()[:EXTRACT_TEXT_MAX_LEN] - prompt = f"""从以下美伊/中东军事新闻中提取可明确推断的数值,输出 JSON。无依据的字段省略不写。 + prompt = f"""从以下美伊/中东新闻**全文或摘要**中,提取**报道明确给出的数字与事实**,输出 JSON。规则: +1. 仅填写报道中**直接出现、可核对**的数据,不要推测或估算。 +2. 无明确依据的字段**必须省略**,不要填 0 或猜。 +3. **战损一律按增量**:只填本则报道中「本次/此次/今日」**新增**数量。报道若只给「累计总数」「迄今共」**不要填**该字段。 +4. **攻击地点**:提取双方遭袭地点。美军/盟军基地被打击 → side=us;伊朗/亲伊设施被打击 → side=iran。name_keywords 用「中文|英文」,可填多处。 -要求: -- summary: 1-2句中文事实摘要,≤80字 +字段: +- summary: 1-2 句中文事实概括,≤80 字 - category: deployment|alert|intel|diplomatic|other - severity: low|medium|high|critical -- 战损(仅当新闻明确提及数字时填写): - us_personnel_killed, iran_personnel_killed, us_personnel_wounded, iran_personnel_wounded, - us_civilian_killed, iran_civilian_killed, us_civilian_wounded, iran_civilian_wounded, - us_bases_destroyed, iran_bases_destroyed, us_bases_damaged, iran_bases_damaged. - 重要:bases_* 仅指已确认损毁/受损的基地数量;"军事目标"/"targets"等泛指不是基地,若报道只说"X个军事目标遭袭"而无具体基地名,不填写 bases_* - us_aircraft, iran_aircraft, us_warships, iran_warships, us_armor, iran_armor, us_vehicles, iran_vehicles, - us_drones, iran_drones, us_missiles, iran_missiles, us_helicopters, iran_helicopters, us_submarines, iran_submarines, - us_tanks, iran_tanks, us_civilian_ships, iran_civilian_ships, us_airport_port, iran_airport_port -- retaliation_sentiment: 0-100,仅当新闻涉及伊朗报复/反击情绪时 -- wall_street_value: 0-100,仅当新闻涉及美股/市场反应时 -- key_location_updates: 当新闻提及具体基地/设施遭袭时必填,数组 [{{"name_keywords":"阿萨德|asad|assad|阿因","side":"us","status":"attacked","damage_level":1-3}}]。常用关键词:阿萨德|asad|巴格达|baghdad|乌代德|udeid|埃尔比勒|erbil|因吉尔利克|incirlik|德黑兰|tehran|阿巴斯|abbas|布什尔|bushehr|伊斯法罕|isfahan|纳坦兹|natanz +- 战损(**仅填本则报道的新增增量**): us_personnel_killed, iran_personnel_killed, us_personnel_wounded, iran_personnel_wounded, us_civilian_killed, iran_civilian_killed, us_civilian_wounded, iran_civilian_wounded, us_bases_destroyed, iran_bases_destroyed, us_bases_damaged, iran_bases_damaged, us_aircraft, iran_aircraft, us_warships, iran_warships, us_armor, iran_armor, us_vehicles, iran_vehicles, us_drones, iran_drones, us_missiles, iran_missiles, us_helicopters, iran_helicopters, us_submarines, iran_submarines, us_carriers, iran_carriers, us_civilian_ships, iran_civilian_ships, us_airport_port, iran_airport_port +- retaliation_sentiment: 0-100(仅当报道涉及伊朗报复情绪时) +- wall_street_value: 0-100(仅当报道涉及美股/市场时) +- key_location_updates: **双方攻击地点**。每项 {{"name_keywords":"阿萨德|asad","side":"us或iran(被打击方)","status":"attacked","damage_level":1-3}}。美军基地:阿萨德|asad、乌代德|udeid、埃尔比勒|erbil、因吉尔利克|incirlik。伊朗:德黑兰|tehran、布什尔|bushehr、伊斯法罕|isfahan、阿巴斯|abbas、纳坦兹|natanz 原文: -{str(text)[:800]} +{raw} 直接输出 JSON,不要其他解释:""" @@ -86,7 +87,7 @@ def extract_from_news(text: str, timestamp: Optional[str] = None) -> Dict[str, A loss_ir = {} for k in ["personnel_killed", "personnel_wounded", "civilian_killed", "civilian_wounded", "bases_destroyed", "bases_damaged", "aircraft", "warships", "armor", "vehicles", - "drones", "missiles", "helicopters", "submarines", "tanks", "civilian_ships", "airport_port"]: + "drones", "missiles", "helicopters", "submarines", "carriers", "civilian_ships", "airport_port"]: uk, ik = f"us_{k}", f"iran_{k}" if uk in parsed and isinstance(parsed[uk], (int, float)): loss_us[k] = max(0, int(parsed[uk])) diff --git a/crawler/extractor_rules.py b/crawler/extractor_rules.py index 0f04164..4414897 100644 --- a/crawler/extractor_rules.py +++ b/crawler/extractor_rules.py @@ -172,13 +172,13 @@ def extract_from_news(text: str, timestamp: Optional[str] = None) -> Dict[str, A else: loss_us["submarines"] = v - # 坦克 tank / 坦克 - v = _first_int(t, r"(\d+)[\s\w]*(?:tank|坦克)[\s\w]*(?:destroyed|damaged|lost|hit|摧毁|损毁|击毁)") + # 航母 carrier / 航空母舰 / 航母 + v = _first_int(t, r"(\d+)[\s\w]*(?:carrier|aircraft\s*carrier|航母|航空母舰)[\s\w]*(?:destroyed|damaged|lost|hit|sunk|摧毁|损毁|击毁|沉没)") if v is not None: if "iran" in t or "iranian" in t: - loss_ir["tanks"] = v + loss_ir["carriers"] = v else: - loss_us["tanks"] = v + loss_us["carriers"] = v # 民船 civilian ship / 商船 / 民船 v = _first_int(t, r"(\d+)[\s\w]*(?:civilian\s*ship|merchant|商船|民船)[\s\w]*(?:sunk|damaged|hit|击沉|受损)") diff --git a/crawler/main.py b/crawler/main.py index 3dfc6ab..0414aa4 100644 --- a/crawler/main.py +++ b/crawler/main.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -"""爬虫入口:定时抓取 → 解析 → 入库 → 通知 API""" +"""爬虫入口:定时执行完整写库流水线(抓取 → 清洗 → 去重 → 映射 → 更新表 → 通知 API)""" import time import sys from pathlib import Path @@ -8,34 +8,18 @@ from pathlib import Path sys.path.insert(0, str(Path(__file__).resolve().parent)) from config import DB_PATH, API_BASE, CRAWL_INTERVAL -from scrapers.rss_scraper import fetch_all -from db_writer import write_updates - - -def notify_api() -> bool: - """调用 Node API 触发立即广播""" - try: - import urllib.request - req = urllib.request.Request( - f"{API_BASE}/api/crawler/notify", - method="POST", - headers={"Content-Type": "application/json"}, - ) - with urllib.request.urlopen(req, timeout=5) as resp: - return resp.status == 200 - except Exception as e: - print(f" [warn] notify API failed: {e}") - return False +from pipeline import run_full_pipeline def run_once() -> int: - items = fetch_all() - if not items: - return 0 - n = write_updates(items) - if n > 0: - notify_api() - return n + """执行一轮:抓取、清洗、去重、映射、写表、通知。返回本轮新增条数(面板或资讯)。""" + n_fetched, n_news, n_panel = run_full_pipeline( + db_path=DB_PATH, + api_base=API_BASE, + translate=True, + notify=True, + ) + return n_panel or n_news def main() -> None: @@ -45,7 +29,7 @@ def main() -> None: try: n = run_once() if n > 0: - print(f"[{time.strftime('%H:%M:%S')}] Inserted {n} new update(s)") + print(f"[{time.strftime('%H:%M:%S')}] 抓取完成,去重后新增 {n} 条,已写库并通知 API") except KeyboardInterrupt: break except Exception as e: diff --git a/crawler/panel_schema.py b/crawler/panel_schema.py index d003beb..e9a1ac5 100644 --- a/crawler/panel_schema.py +++ b/crawler/panel_schema.py @@ -19,7 +19,7 @@ TimeSeriesPoint = Tuple[str, int] # (ISO time, value) # AI 可从新闻中提取的字段 EXTRACTABLE_FIELDS = { "situation_update": ["summary", "category", "severity", "timestamp"], - "combat_losses": ["personnel_killed", "personnel_wounded", "civilian_killed", "civilian_wounded", "bases_destroyed", "bases_damaged", "aircraft", "warships", "armor", "vehicles", "drones", "missiles", "helicopters", "submarines", "tanks", "civilian_ships", "airport_port"], + "combat_losses": ["personnel_killed", "personnel_wounded", "civilian_killed", "civilian_wounded", "bases_destroyed", "bases_damaged", "aircraft", "warships", "armor", "vehicles", "drones", "missiles", "helicopters", "submarines", "tanks", "carriers", "civilian_ships", "airport_port"], "retaliation": ["value"], # 0-100 "wall_street_trend": ["time", "value"], # 0-100 "conflict_stats": ["estimated_casualties", "estimated_strike_count"], diff --git a/crawler/pipeline.py b/crawler/pipeline.py new file mode 100644 index 0000000..df8494e --- /dev/null +++ b/crawler/pipeline.py @@ -0,0 +1,150 @@ +# -*- coding: utf-8 -*- +""" +统一写库流水线:抓取 → 清洗 → 去重 → 映射到前端库字段 → 更新表 → 通知 +与 server/README.md 第五节「爬虫侧写库链路」一致,供 main.py 与 realtime_conflict_service 共用。 +""" +import os +from datetime import datetime, timezone +from typing import Callable, Optional, Tuple + +from config import DB_PATH, API_BASE + + +def _notify_api(api_base: str) -> bool: + """调用 Node API 触发立即广播""" + try: + import urllib.request + req = urllib.request.Request( + f"{api_base.rstrip('/')}/api/crawler/notify", + method="POST", + headers={"Content-Type": "application/json"}, + ) + with urllib.request.urlopen(req, timeout=5) as resp: + return resp.status == 200 + except Exception as e: + print(f" [warn] notify API failed: {e}") + return False + + +def _extract_and_merge(items: list, db_path: str) -> bool: + """AI 从新闻全文或标题+摘要中提取精确结构化数据,合并到 combat_losses / key_location 等表。""" + if not items or not os.path.exists(db_path): + return False + try: + from db_merge import merge + use_dashscope = bool(os.environ.get("DASHSCOPE_API_KEY", "").strip()) + if use_dashscope: + from extractor_dashscope import extract_from_news + limit = 10 + elif os.environ.get("CLEANER_AI_DISABLED", "0") == "1": + from extractor_rules import extract_from_news + limit = 25 + else: + from extractor_ai import extract_from_news + limit = 10 + merged_any = False + for it in items[:limit]: + # 优先用正文(article_fetcher 抓取),否则用标题+摘要,供 AI 提取精确数字 + text = it.get("full_text") or ((it.get("title", "") or "") + " " + (it.get("summary", "") or "")) + if len(text.strip()) < 20: + continue + pub = it.get("published") + ts = None + if pub: + try: + if isinstance(pub, str): + pub_dt = datetime.fromisoformat(pub.replace("Z", "+00:00")) + else: + pub_dt = pub + if pub_dt.tzinfo: + pub_dt = pub_dt.astimezone(timezone.utc) + ts = pub_dt.strftime("%Y-%m-%dT%H:%M:%S.000Z") + except Exception: + pass + extracted = extract_from_news(text, timestamp=ts) + if extracted and merge(extracted, db_path=db_path): + merged_any = True + return merged_any + except Exception as e: + print(f" [warn] AI 面板数据提取/合并: {e}") + return False + + +def run_full_pipeline( + db_path: Optional[str] = None, + api_base: Optional[str] = None, + *, + translate: bool = True, + notify: bool = True, + on_notify: Optional[Callable[[], None]] = None, +) -> Tuple[int, int, int]: + """ + 执行完整写库链路: + 1. 爬虫抓取实时数据 + 2. AI 清洗(标题/摘要/分类)→ 有效数据 + 3. 去重(news_content content_hash)→ 仅新项进入后续 + 4. 有效数据映射到前端库字段(situation_update、news_content、combat_losses 等) + 5. 更新数据库表;若有更新则通知后端 + + translate: 是否对标题/摘要做翻译(英→中) + notify: 是否在流水线末尾调用 POST /api/crawler/notify + on_notify: 若提供,在通知前调用(供 gdelt 服务做 GDELT 回填等) + + 返回: (本轮抓取条数, 去重后新增资讯数, 写入 situation_update 条数) + """ + path = db_path or DB_PATH + base = api_base or API_BASE + + from scrapers.rss_scraper import fetch_all + from db_writer import write_updates + from news_storage import save_and_dedup + from cleaner_ai import clean_news_for_panel, ensure_category, ensure_severity + + # 1. 抓取 + items = fetch_all() + if not items: + return 0, 0, 0 + + # 2. 清洗(标题/摘要/分类,符合面板 schema) + if translate: + from translate_utils import translate_to_chinese + for it in items: + raw_title = translate_to_chinese(it.get("title", "") or "") + raw_summary = translate_to_chinese(it.get("summary", "") or it.get("title", "")) + it["title"] = clean_news_for_panel(raw_title, max_len=80) + it["summary"] = clean_news_for_panel(raw_summary or raw_title, max_len=120) + else: + for it in items: + it["title"] = clean_news_for_panel(it.get("title", "") or "", max_len=80) + it["summary"] = clean_news_for_panel(it.get("summary", "") or it.get("title", ""), max_len=120) + for it in items: + it["category"] = ensure_category(it.get("category", "other")) + it["severity"] = ensure_severity(it.get("severity", "medium")) + it["source"] = it.get("source") or "rss" + + # 3. 去重:落库 news_content,仅新项返回 + new_items, n_news = save_and_dedup(items, db_path=path) + + # 3.5 数据增强:为参与 AI 提取的条目抓取正文,便于从全文提取精确数据(伤亡、基地等) + if new_items: + try: + from article_fetcher import enrich_item_with_body + # 仅对前若干条抓取正文,避免单轮请求过多 + enrich_limit = int(os.environ.get("ARTICLE_FETCH_LIMIT", "10")) + for it in new_items[:enrich_limit]: + enrich_item_with_body(it) + except Exception as e: + print(f" [warn] 正文抓取: {e}") + + # 4. 映射到前端库字段并更新表 + n_panel = write_updates(new_items) if new_items else 0 + if new_items: + _extract_and_merge(new_items, path) + + # 5. 通知(有新增时才通知;可选:先执行外部逻辑如 GDELT 回填,再通知) + if on_notify: + on_notify() + if notify and (n_panel > 0 or n_news > 0): + _notify_api(base) + + return len(items), n_news, n_panel diff --git a/crawler/realtime_conflict_service.py b/crawler/realtime_conflict_service.py index 5e4e626..ea46f00 100644 --- a/crawler/realtime_conflict_service.py +++ b/crawler/realtime_conflict_service.py @@ -283,93 +283,34 @@ def _rss_to_gdelt_fallback() -> None: # ========================== -# RSS 新闻抓取:资讯落库(去重) → AI 提取 → 面板数据落库 → 通知前端 +# RSS 新闻抓取:使用统一流水线(抓取 → 清洗 → 去重 → 映射 → 写表 → 通知) # ========================== LAST_FETCH = {"items": 0, "inserted": 0, "error": None} def fetch_news() -> None: + """执行完整写库流水线;GDELT 禁用时用 RSS 回填 gdelt_events,再通知 Node。""" try: - from scrapers.rss_scraper import fetch_all - from db_writer import write_updates - from news_storage import save_and_dedup - from translate_utils import translate_to_chinese - from cleaner_ai import clean_news_for_panel - from cleaner_ai import ensure_category, ensure_severity + from pipeline import run_full_pipeline LAST_FETCH["error"] = None - items = fetch_all() - for it in items: - raw_title = translate_to_chinese(it.get("title", "") or "") - raw_summary = translate_to_chinese(it.get("summary", "") or it.get("title", "")) - it["title"] = clean_news_for_panel(raw_title, max_len=80) - it["summary"] = clean_news_for_panel(raw_summary or raw_title, max_len=120) - it["category"] = ensure_category(it.get("category", "other")) - it["severity"] = ensure_severity(it.get("severity", "medium")) - it["source"] = it.get("source") or "rss" - # 1. 历史去重:资讯内容落库 news_content(独立表,便于后续消费) - new_items, n_news = save_and_dedup(items, db_path=DB_PATH) - # 2. 面板展示:新增资讯写入 situation_update(供前端 recentUpdates) - n_panel = write_updates(new_items) if new_items else 0 - LAST_FETCH["items"] = len(items) + n_fetched, n_news, n_panel = run_full_pipeline( + db_path=DB_PATH, + api_base=API_BASE, + translate=True, + notify=False, + ) + LAST_FETCH["items"] = n_fetched LAST_FETCH["inserted"] = n_news - # 3. AI 提取 + 合并到 combat_losses / key_location 等 - if new_items: - _extract_and_merge_panel_data(new_items) - # GDELT 禁用时用 RSS 填充 gdelt_events,使地图有冲突点 if GDELT_DISABLED: _rss_to_gdelt_fallback() _notify_node() - print(f"[{datetime.now().strftime('%H:%M:%S')}] RSS 抓取 {len(items)} 条,去重后新增 {n_news} 条资讯,面板 {n_panel} 条") + if n_fetched > 0: + print(f"[{datetime.now().strftime('%H:%M:%S')}] RSS 抓取 {n_fetched} 条,去重后新增 {n_news} 条资讯,面板 {n_panel} 条") except Exception as e: LAST_FETCH["error"] = str(e) print(f"[{datetime.now().strftime('%H:%M:%S')}] 新闻抓取失败: {e}") -def _extract_and_merge_panel_data(items: list) -> None: - """AI 分析提取面板相关数据,清洗后落库""" - if not items or not os.path.exists(DB_PATH): - return - try: - from db_merge import merge - use_dashscope = bool(os.environ.get("DASHSCOPE_API_KEY", "").strip()) - if use_dashscope: - from extractor_dashscope import extract_from_news - limit = 10 - elif os.environ.get("CLEANER_AI_DISABLED", "0") == "1": - from extractor_rules import extract_from_news - limit = 25 - else: - from extractor_ai import extract_from_news - limit = 10 - from datetime import timezone - merged_any = False - for it in items[:limit]: - text = (it.get("title", "") or "") + " " + (it.get("summary", "") or "") - if len(text.strip()) < 20: - continue - pub = it.get("published") - ts = None - if pub: - try: - if isinstance(pub, str): - pub_dt = datetime.fromisoformat(pub.replace("Z", "+00:00")) - else: - pub_dt = pub - if pub_dt.tzinfo: - pub_dt = pub_dt.astimezone(timezone.utc) - ts = pub_dt.strftime("%Y-%m-%dT%H:%M:%S.000Z") - except Exception: - pass - extracted = extract_from_news(text, timestamp=ts) - if extracted: - if merge(extracted, db_path=DB_PATH): - merged_any = True - if merged_any: - _notify_node() - except Exception as e: - print(f" [warn] AI 面板数据提取/合并: {e}") - - # ========================== # 定时任务(asyncio 后台任务,避免 APScheduler executor 关闭竞态) # ========================== diff --git a/crawler/requirements.txt b/crawler/requirements.txt index 427768e..bc93781 100644 --- a/crawler/requirements.txt +++ b/crawler/requirements.txt @@ -1,5 +1,6 @@ requests>=2.31.0 feedparser>=6.0.0 +beautifulsoup4>=4.12.0 pytest>=7.0.0 fastapi>=0.109.0 uvicorn>=0.27.0 diff --git a/crawler/scrapers/__pycache__/rss_scraper.cpython-311.pyc b/crawler/scrapers/__pycache__/rss_scraper.cpython-311.pyc index 5882ceb1c19095e65e7b19722560348d40d774ad..50257baa56cfa103abab7525e1964464f3b2a23e 100644 GIT binary patch literal 4945 zcma)9`)^a%9Y6PFU%wsa4Pm$?q?nYXDYQ@)A^}NPpwNwwwxk8i_}&DAAL+T*1k&rt z>OeEpfTgnHzBX55ER=4v60I^ywbDOevL{Qku9PWKF^`|>idKsFY2R~i96v&(9Up(s z>w8|`bMEK!J%6>?%m_m3$mPH$3qs$KLOGZgG7oM*<{}c%C=w`v4pO6(meQj%q)d?+(_Mh`_sEu-@fQI1*|Q9oQYCW3L=v&oEBL|zG1wan91DcUdq;-1_WH4JGAQC+2{IEz z1xu25q0lpNLbd2C_L9}&HWg?-iqhwe>7~haks1#NR^Um zZXO3J#3m1(1#uB62)9(6PCu`|D`*iZz)Y|l+%wsuAXWr78 zwRBG7EZg@0bdbu*;&HiV17z{q29869Y+wRVECcza;WAWRU95)Bi(V`s_~4e4&nZ)> zs>;>&RH$RxM?p59><}33Qsd>Izwde2J{OEkiWn2idKm16M5SbvI$FMax+NOCA)o9& zEK1&^;)%%!79_kHR6F1=y#NC2VX`HbrCk5vm;IYxmg76}d`EoiEV%|W*QSXCRj^pk zZ#=W{^mEDQ(o|v~F;Hl1O>IbRC^5*oY|i3LJcCz~1+ZZsAQ==$7$=i&Z6OG(f<8b3r*I#jkJ$P-$C01j4}28ke#`yLI+^-D!wdS8R2k}E<%=Hx zth}o#~g|wQ*{r@V}Z4J#f;60SuhB;Y61*X&|*<+id}IiNU(ocH!Fpb zUn>UqiXG9NFq-a7fSYi8<`cK(Uff85-=O;wcM|UwZ@yKWnYs7R3&juqc=yVUV&dYx zkEg$jzXi_3z3YFvcm2=Bb6?&6{aL7)G82+nw{;w7XsUT+c-;={3$GjWNrw+bd|2r1 z>v;s7BQOA8@lJIA z#bZ|uQpA5$l+^}cUO^o5MT7DJa;Oc`k}5NCel#k_DyOBYY3R6L)YrsK;6PYbjf9Q^ zqJ#-CY4ERMp2nh>cp$35H!&fG1&{7J= zb$b_#qHnpEw7H4sBMJ=c7^q1+COM%9en3sof!P6m900Qy96rsX!$Y9j34iH(FlZbV zU2S>S+MMgjyz9yMu36TcXFJlbbz+29n&zK9% z%L;3Hv#py`lW94_q~!~-RP3`6pvFS`?YTI&ji!pDbnOnA&ty`E|t0p``LAR;EMrW{7!>JCdSId+gJz+qp-4}Ttam| zTMP&jdWt|j27#)etw0M*HE2RBTWKAmV@AcO(BSw`$(7(HEs}U;9zs{Oa8ALSB~!Js zimt$&L;&MetE{xu?sO2K=}$dh$`Ll6ejSZQpajM#E0g7A>fz& zkQhs;1vrTKN}F0BF)N7?ordi;1prZ3~0>HCJiv1MjU`mywH zW+0u}qWObB3*<@?D3mpP!{j7X#wQ4P3(0W-Sqw=Y9ymsS&tme{R#{0D!xd;&xkJQ< zRav5~GT_Kc1jWmss^h6?ap`yj8h628`WFa5Qu{A*c6Z+Hj{mA)0qmnVmj-TgZOh_A zvyXLt+Wn-+bm`5kYkgvS!Qn~?i8m8(7MyJbtMmK|XI@ARpWdC^o!C8R zZ8-npnHO~vPH(uj`IF68`akW5a;|+x9;9_=-nuiZ#ga{%&xx!J@fUTowx~w0SBUt% zUQ8T$Owgn9V#S-Fs2co{@K|76dmm_K*N?)alc(1g(3nu;L^vOK@sekf${vbDf*M<@ z453nBc?l%BA^HIl+u|KW5Qkqg{X@P#6cM69aR48LI^vy5Js{!XNKtcWB#TB0$Q0Lp z1!RkBzXEz9`=~gF8nWd$hklv;`B*^R+1gk#@KjfN4ICf^NM0*iDC+6ddN_RQ>9n3` z`6|J4R6}}u2|)RKDP?4%s&RV4&T1{5LrOfSuJ&d4|6@eDQfVh`0wHKaDA*$bOX L2(mvbG<*IJ+BMw* delta 2163 zcmaJ?Yit`u5Z=9q@6M0JaopH(oWyC;v~EL48bk$w@@NtPZFwYAP>W*H+?ksA5%w+w z&{>XnhzdpG+N-Fg7BCVjLFGp!{vaU){e$42U0LesBukYLNaY7ODo7E(n7uqo+QQ!6 z+|0MLGqbldyVd@BwD`Ht=SI*DUH>e7SVHJ$Dr_`vow@NoFlUjB5=h1}lfelLUpB*J znFNz%6D(fCxdaC|m*KOngexl~1PA9cVpd8>z`GPC#mK@tT*57jiU;HgPl}Tz0KBr> z0em2PKyHG;X2svW5ekBdUcg&qW(vi9^EBwmsj1kn@0-7T@S%A>?rdXh36SK(`1shA zEgl>{`rK2KV^d$fh0BewAZ8@b)mUDh3iGZ9r{LQq@R$E_p)B zW=JO>A@d^l)1C-$&rZY{o6qDI6hd4e$63-%r(n&-vnmU0ECyE0Pxuhu2U3Um4gbtw z7jQH1ZZ;2aRQbk!pzuSnj54TzUPcA1vF4|~cAn8unK3aRG@s_1Tm=?HmWl&1a{)rV zNW*m+MiSD*4GEz!WZ+|c1rAV;In4x%Iw!L-mxlfLlD`1%E>g!%EATmgFVdPeWQ6(< zfjvBmmUb_o*V&_J0qguLo5sh`vEK_Wt@Wn9?vh=yu+{1umW6__sZ|7RP!n%c3v8;a zX-KzeP(z}&?O*Q1t!pnxx7cvsZsU0{;?{+8NcQN`IrI@*@Mv`J*PHIyYEs2ZL>FWv zd*5YlCY|n1F$J&g)tT22!Ao!JOxf#f&Pmb#7)DyhhB>;{tp*D|ji%6gyVczb3_^vb zLbHy`&D7wFx}ZzMtxFJr@81#RFpRkW&yn7wH_J`uZ*h;&(jNw1>X*<)2AO?AK+HvY zw*cnHg4m*CD5>Tr3B-i1)50>}lOCL~c`dDF6hhPA7S#D{cAC6ldt^nOfg;h;`JByV z(z#h1$F4sOI%xxPGwvdB;K^<}iJd1IGve;{#r?KR&Ckp#n$4=_l&?erM_MGe*VX7rFbk^edwjyLoWeTkyD1J z9@$Zztw#3NB6|&?9_lL1ms88ftD)gqXt)yEcWKh#tVnkyvY-iB!D?u* z78-;=TRN>^)CxsQQ>EjjsWWeu@2RxyGFYp%t<-0X8l!a7A3Q6b7E8s}Q@%4k!&i^) zT;{9M!CG|Ckm?;nmx9%fdutu{o)Qh#n6mua&U#OKtmyc2cfDs|`AD^Aq}DTH#d@JPd_q^Cfq<+ap{4atN&Ut3{uUp|=H>Z}vY$K& z7`;=}-+%-mkubi7{1xQ4khJJL=4hb5t-pfq(9J?X9f;!=u+2~6`#4Uz5Db!aZ(pTFbGsu5Zg#as*q(-u%kX8L3LcK+lJNk&e*>3-G}GS zI<<8dMRhD1S_exBsc=kd^Qcm&s3JvCl%(P-f5Uu2YSxcW{lF(GK>X%jJ5H0knz?h% zJnlJX&hK~D%jYu$&r{wZv{>*RZi91vRg}+)A5`gWNO)RmJ)6 z&Yy1X-1_r_J1d<}Z+5P%be3;FxOWZuT^vbHl?>%FPjczkxN>#B*r@Z8soa-GUz<4n z%E*|?og5tHi-ecmeFX<;};Tz}}9U*65YEwxf5>ZB$%(f93!dg5oX=E_MjAa91GNp0O z^yy_fK@3947_o@7q&;OU{FFkbQ$!QnYpCb<)StK8is^ z#34yG;)IXLra6ZAB2MP1#njO2zZgFI-C?EAG6Yv<*qSU+p?Af8BwY_g*cD3QH!Cx; z7W6_9>Oc`HzJeaREy>@MPa2~GsatH!_b`Y^(_wUjY!Y0*PJ^TR>a`rvc_NCy7>t`M zGMI_06i;E#nUnZ{9W>5!AuPxxQ+yRKMWv#%kK5T@sd=(G!^6rfUpU_oEIfv_VN4Q; zp^hWs_jP@YrL4LK5ST3ysFX%RqqXOQwt4^27XW5SeE2cKtAh#qO}t_ICZ%Mtx!5bs za~6^r?Bb&_bH!|16QuqElGa|3l(cQE85n-&;%AJxwNvgJfn-+y13J zq0cuWB(qNWL^ntVvF1Qu&mD{hvA^v^*0~g-`GqCdw_*RTBPZZcS#R6n5lgPV3$<+J$yk znZKg+*;(|X1iK1A)*PlfwUgn`Ac6+ zc6V37VhOu#y?w27|EJSZ-Lr&Zhf$G>yMqw_jZ4^ z0?lGZm0AyOGT|}kgwH>Y&n}b_;Io37 z6BiX7wD2T4Vz@K|j?*mXP~*QDyFhzr(a8L-VDuUm=8E(fuv?@BjExw{dTkHd-tXU; zv>3ukRejw4qX|-b98=Im3?x}1Z4gmdmfC~K! zfEb7iRSD46=?%$xnoJq(RFsMgP*-{GSrC~O#c3YV4V&4_xrlWe&L&P?qH+2^J9jM7 zRVvkb6{aKr*t&FKYyHyh^2+w7f7@QW_y`h`ntxF3<2s+zd|mCAZ**>6NHEY@y`10# zV2H2i)b7vmf%sm(R30?5X%Zbn3y-7o5sI@%;WkAGP>g3y0UnEtE5@7!L#fcT9p54f zp3y%j^d?20xY*1P^Q*Gjs5OUmjsF)KS^QdZ*d%?!UDtKjXkwyZ1{V~MYD9202R~xS2+KBxwlh_m?QHkVY;{f6 z4c*LP<3$h`8dfkXY|yh8{|C{3p${G+3VZY_2)?S7D5;^|tKaW^bk(a@^Y!wh-aPGe zd<5fz)9xx?N2R_u#F3h!BVl4*_?07jRefM6`-bNRLOrEO0VJ z{e_v{@{Qr}g+IP%e*fyIS;NmSlhy+!xw^hS7`E>F>zf-N-53miRQNo`i%mn`!(sC) zSq+vTv`<=GN~a-o_{P(K<>F)U6s+1HDV5Bo$+jx;XMiXP2!!P-NQkwR_)J0gMQ^G3 zj$WBB1O4$;FxdHIPe^HNH!7b6)ne7+ohS1zBt@!5+oDW%#Qxqy^70(`mgXXGh+Q+~ zdEiDcACCxD0nB${me(;{2j(b8^BUboHCFfl)uciv~Ily7^Phlvgp%xN|_K z_qbC#2D^agbb4ffXN*T+bd_JZ2e|eO?;HLH;Qh*ZfOt@O574)?4pdMD2e|xsce`>p z;^F6H6;=KgslzI)$Xz5KfrXp^(g#tUib2kf>guRZNZqY_6+Y>0R!-#|V!2XzkffuN zj0tf5SwPjTdc1qgbc)^W-L(y*Yg18;$Kii#I4g$c>RppiS#+u|qjA_3ueDXCav_(S zTkO)g)9b)zhv3p@@*=F|B@iv0$-Mc7UFh_qmX(vyj!-Qvn_t*vTUt8PpX|ci5o2g8 zQA(ZLG?TgGmZqFb-ItQ-Tr8bV#hCXa`HF^irbJP;Rw=|d4kkHIR91+|Ov#tE+1e6< z%bt$b+i3%t?j_}{C{np^_9kD_HfY81_#U%_Pn97s0Q* z1_D7xT9^?R`=EVHKoZO_`P;Wcz&?)fbKn!LKm8%@Sr*= bool: return False -def fetch_all() -> list[dict]: - import socket - items: list[dict] = [] - seen: set[str] = set() - # 单源超时 10 秒,避免某源卡住 +def _fetch_one_feed(name: str, url: str, timeout: int) -> list[dict]: + """抓取单个 RSS 源,超时或异常返回空列表。不负责去重。""" old_timeout = socket.getdefaulttimeout() - socket.setdefaulttimeout(10) + socket.setdefaulttimeout(timeout) try: - for url in RSS_FEEDS: - try: - feed = feedparser.parse( - url, - request_headers={"User-Agent": "US-Iran-Dashboard/1.0"}, - agent="US-Iran-Dashboard/1.0", - ) - except Exception: - continue - for entry in feed.entries: - title = getattr(entry, "title", "") or "" - raw_summary = getattr(entry, "summary", "") or getattr(entry, "description", "") or "" - summary = _strip_html(raw_summary) - link = getattr(entry, "link", "") or "" - text = f"{title} {summary}" - if not _matches_keywords(text): - continue - key = (title[:80], link) - if key in seen: - continue - seen.add(key) - published = _parse_date(entry) - cat, sev = classify_and_severity(text) - items.append({ - "title": title, - "summary": summary[:400] if summary else title, - "url": link, - "published": _parse_date(entry), - "category": cat, - "severity": sev, - }) + feed = feedparser.parse( + url, + request_headers={"User-Agent": "US-Iran-Dashboard/1.0"}, + agent="US-Iran-Dashboard/1.0", + ) + except Exception as e: + print(f" [rss] {name} error: {e}") + return [] finally: socket.setdefaulttimeout(old_timeout) + + out = [] + for entry in feed.entries: + title = getattr(entry, "title", "") or "" + raw_summary = getattr(entry, "summary", "") or getattr(entry, "description", "") or "" + summary = _strip_html(raw_summary) + link = getattr(entry, "link", "") or "" + text = f"{title} {summary}" + if not _matches_keywords(text): + continue + published = _parse_date(entry) + cat, sev = classify_and_severity(text) + out.append({ + "title": title, + "summary": summary[:400] if summary else title, + "url": link, + "published": published, + "category": cat, + "severity": sev, + "source": name, + }) + return out + + +def fetch_all() -> list[dict]: + """抓取所有配置的 RSS 源,按源超时与隔离错误,全局去重后返回。""" + sources = get_feed_sources() + if not sources: + return [] + + items: list[dict] = [] + seen: set[tuple[str, str]] = set() + + for name, url in sources: + batch = _fetch_one_feed(name, url, FEED_TIMEOUT) + for item in batch: + key = (item["title"][:80], item["url"]) + if key in seen: + continue + seen.add(key) + # 写入 DB 的 schema 不包含 source,可后续扩展 + items.append({k: v for k, v in item.items() if k != "source"}) + return items