fix: 更改数据库包

This commit is contained in:
Daniel
2026-03-03 14:49:02 +08:00
parent 85dea726e9
commit 29c921f498
10 changed files with 338 additions and 461 deletions

170
server/README.md Normal file
View File

@@ -0,0 +1,170 @@
# 后端运行逻辑
后端是 **Node.js Express + SQLite + WebSocket**,与 Python 爬虫共用同一数据库文件负责提供「态势数据」API、实时推送和简单统计。
---
## 一、启动方式
```bash
npm run api # 启动 server/index.js默认端口 3001
```
- 端口:`process.env.API_PORT || 3001`
- 数据库:`process.env.DB_PATH``server/data.db`(与爬虫共用)
---
## 二、整体架构
```
┌─────────────────────────────────────────┐
│ server/index.js │
│ (HTTP Server + WebSocket Server) │
└─────────────────────────────────────────┘
┌───────────────────────────────┼───────────────────────────────┐
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌─────────────┐ ┌─────────────┐
│ /api/* │ │ /ws │ │ 静态 dist │
│ routes.js │ │ WebSocket │ │ (生产) │
└──────┬──────┘ └──────┬──────┘ └─────────────┘
│ │
│ 读/写 │ 广播 situation + stats
▼ │
┌─────────────┐ │
│ db.js │◄─────────────────────┘
│ (SQLite) │ getSituation() / getStats()
└──────┬──────┘
│ 同文件 data.db
┌─────────────┐
│ Python 爬虫 │ 抓取 → 去重 → AI 清洗 → 映射到库字段 → 写表 → POST /api/crawler/notify
│ situation_ │ (main.py 或 gdelt 服务;写 situation_update / news_content / combat_losses 等)
│ update 等 │
└─────────────┘
```
---
## 三、核心模块
| 文件 | 作用 |
|------|------|
| **index.js** | 创建 HTTP + WebSocket 服务,挂载路由、静态资源、定时广播、爬虫通知回调 |
| **routes.js** | 所有 `/api/*` 接口situation、db/dashboard、visit、feedback、share、stats、events、news 等 |
| **situationData.js** | `getSituation()`从多张表聚合为前端所需的「态势」JSON军力、基地、战损、事件脉络、GDELT 等) |
| **db.js** | SQLite 连接、建表、迁移better-sqlite3WAL 模式) |
| **stats.js** | `getStats()`:在看人数、累计访问、留言数、分享数 |
| **openapi.js** | Swagger/OpenAPI 文档定义 |
| **seed.js** | 初始化/重置种子数据(可单独运行 `npm run api:seed` |
---
## 四、数据流(读)
1. **前端要「整页态势」**
- 请求 `GET /api/situation``routes.js` 调用 `getSituation()`
- `situationData.js` 从 db 读:`force_summary``power_index``force_asset``key_location``combat_losses``wall_street_trend``retaliation_*``situation_update`(最近 50 条)、`gdelt_events``conflict_stats`
- 组装成 `{ lastUpdated, usForces, iranForces, recentUpdates, conflictEvents, conflictStats, civilianCasualtiesTotal }` 返回。
2. **前端要「事件列表」**
- `GET /api/events` 返回 `conflictEvents` + `conflict_stats` + `updated_at`(同样来自 getSituation 的数据)。
3. **前端要「原始表数据」**
- `GET /api/db/dashboard` 返回多张表的 `SELECT *` 结果(含 `situation_update`),供 `/db` 调试页使用。
4. **WebSocket**
- 连接 `ws://host/ws` 时立即收到一条 `{ type: 'situation', data: getSituation(), stats: getStats() }`
- 之后每 3 秒服务端主动广播同结构数据,前端可据此做实时刷新。
---
## 五、数据流(写)
### 5.1 爬虫侧写库链路(推荐理解顺序)
爬虫写入前端库的完整链路如下,**不是**「抓完直接写表」而是经过去重、AI 清洗、字段映射后再落库:
1. **爬虫抓取实时数据**
- RSS 等源抓取(`scrapers/rss_scraper.fetch_all`),得到原始条目列表。
2. **数据去重**
- 抓取阶段RSS 内按 (title, url) 去重。
- 落库前:按 `content_hash(title, summary, url)``news_content` 表中去重,仅**未出现过**的条目进入后续流程(`news_storage.save_and_dedup`)。
3. **去重后按批次推送给 AI 清洗**
- 对通过去重的每条/每批数据:
- **展示用清洗**:标题/摘要翻译、`clean_news_for_panel` 提炼为符合面板的纯文本与长度(如 summary ≤120 字),`ensure_category` / `ensure_severity` 规范为前端枚举(`cleaner_ai`)。
- **结构化提取**(可选):`extractor_ai` / `extractor_dashscope` / `extractor_rules` 从新闻文本中抽取战损、基地状态等,输出符合 `panel_schema` 的结构。
- 得到「有效数据」:既有人读的 summary/category/severity也有可落库的 combat_losses_delta、key_location 等。
4. **有效数据映射回前端数据库字段**
- 事件脉络:清洗后的条目写入 `situation_update``db_writer.write_updates`)。
- 资讯存档:去重后的新数据写入 `news_content`(已在步骤 2 完成)。
- 结构化数据AI 提取结果通过 `db_merge.merge` 映射到前端表结构,更新 `combat_losses``key_location``retaliation_*``wall_street_trend` 等(与 `situationData.getSituation` 所用字段一致)。
5. **更新数据库表并通知后端**
- 上述表更新完成后,爬虫请求 **POST /api/crawler/notify**
- 后端index.js更新 `situation.updated_at` 并调用 `broadcastSituation()`,前端通过 WebSocket 拿到最新态势。
实现上,**gdelt 服务**`realtime_conflict_service`)里:先对抓取结果做翻译与清洗,再 `save_and_dedup` 去重落库 `news_content`,用去重后的新项写 `situation_update`,再按批次对这批新项做 AI 提取并 `db_merge.merge` 写战损/基地等表。
### 5.2 用户行为写入
- **POST /api/visit**:记 IP 到 `visits``visitor_count.total` +1并触发一次广播。
- **POST /api/feedback**:插入 `feedback`
- **POST /api/share**`share_count.total` +1。
这些写操作在 `routes.js` 中通过 `db.prepare().run()` 完成。
---
## 六、API 一览
| 方法 | 路径 | 说明 |
|------|------|------|
| GET | /api/health | 健康检查 |
| GET | /api/situation | 完整态势(供主面板) |
| GET | /api/events | 冲突事件 + 统计 |
| GET | /api/db/dashboard | 各表原始数据(供 /db 页) |
| GET | /api/news | 资讯列表news_content 表) |
| GET | /api/stats | 在看/累计/留言/分享数 |
| POST | /api/visit | 记录访问并返回 stats |
| POST | /api/feedback | 提交留言 |
| POST | /api/share | 分享计数 +1 |
| POST | /api/crawler/notify | 爬虫通知:更新 updated_at 并广播(内部用) |
- **Swagger**`http://localhost:3001/api-docs`
---
## 七、WebSocket 行为
- **路径**`/ws`(与 HTTP 同端口)。
- **连接时**:服务端发送一条 `{ type: 'situation', data, stats }`
- **定时广播**`setInterval(broadcastSituation, 3000)` 每 3 秒向所有已连接客户端推送最新 `getSituation()` + `getStats()`
- **爬虫通知**POST `/api/crawler/notify` 会立即执行一次 `broadcastSituation()`,不必等 3 秒。
---
## 八、与爬虫的协作
- **共享 DB**:后端与爬虫都使用同一 `DB_PATH`(默认 `server/data.db`)。
- **爬虫写库链路**:爬虫抓取 → 去重 → AI 清洗出有效数据 → 映射到前端库字段 → 更新 `situation_update``news_content``combat_losses``key_location``gdelt_events` 等表 → 调用 POST `/api/crawler/notify` 通知后端。
- **后端角色**:只读这些表(`getSituation()` 等)并推送;不参与抓取、去重或 AI 清洗,不调度爬虫。
整体上,后端是「读库 + 聚合 + 推送」的服务;写库来自**爬虫(经过去重与 AI 清洗、字段映射后)**以及**用户行为**(访问/留言/分享)。
---
## 九、本地验证链路
1. **启动后端**`npm run api`(默认 3001
2. **检查读库**`curl -s http://localhost:3001/api/situation` 应返回含 `lastUpdated``recentUpdates` 的 JSON。
3. **检查写库与通知**:爬虫跑完流水线后会 POST `/api/crawler/notify`,后端会更新 `situation.updated_at` 并广播;可再请求 `/api/situation``lastUpdated` 是否更新。
4. **查原始表**:浏览器打开 `http://localhost:3001/api/db/dashboard` 或前端 `/db` 页,查看 `situation_update``news_content` 等表。
爬虫侧完整验证步骤见 **crawler/README.md** 的「本地验证链路」;项目根目录可执行 `./scripts/verify-pipeline.sh` 做一键检查。

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -1,20 +1,67 @@
const Database = require('better-sqlite3')
/**
* SQLite 封装:使用 sql.js纯 JS/WebAssembly无需 node-gyp
* 对外接口与 better-sqlite3 兼容db.prepare().get/all/run、db.exec
*/
const path = require('path')
const fs = require('fs')
const dbPath = process.env.DB_PATH || path.join(__dirname, 'data.db')
const db = new Database(dbPath)
let _db = null
// 启用外键
db.pragma('journal_mode = WAL')
function getDb() {
if (!_db) throw new Error('DB not initialized. Call initDb() first.')
return _db
}
// 建表
db.exec(`
function wrapDatabase(nativeDb, persist) {
return {
prepare(sql) {
return {
get(...args) {
const stmt = nativeDb.prepare(sql)
stmt.bind(args.length ? args : null)
const row = stmt.step() ? stmt.getAsObject() : undefined
stmt.free()
return row
},
all(...args) {
const stmt = nativeDb.prepare(sql)
stmt.bind(args.length ? args : null)
const rows = []
while (stmt.step()) rows.push(stmt.getAsObject())
stmt.free()
return rows
},
run(...args) {
const stmt = nativeDb.prepare(sql)
stmt.bind(args.length ? args : null)
while (stmt.step());
stmt.free()
persist()
},
}
},
exec(sql) {
const statements = sql.split(';').map((s) => s.trim()).filter(Boolean)
statements.forEach((s) => nativeDb.run(s))
persist()
},
pragma(str) {
nativeDb.run('PRAGMA ' + str)
},
}
}
function runMigrations(db) {
const exec = (sql) => db.exec(sql)
const prepare = (sql) => db.prepare(sql)
exec(`
CREATE TABLE IF NOT EXISTS situation (
id INTEGER PRIMARY KEY CHECK (id = 1),
data TEXT NOT NULL,
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS force_summary (
side TEXT PRIMARY KEY CHECK (side IN ('us', 'iran')),
total_assets INTEGER NOT NULL,
@@ -26,7 +73,6 @@ db.exec(`
missile_consumed INTEGER NOT NULL,
missile_stock INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS power_index (
side TEXT PRIMARY KEY CHECK (side IN ('us', 'iran')),
overall INTEGER NOT NULL,
@@ -34,7 +80,6 @@ db.exec(`
economic_power INTEGER NOT NULL,
geopolitical_influence INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS force_asset (
id TEXT PRIMARY KEY,
side TEXT NOT NULL CHECK (side IN ('us', 'iran')),
@@ -45,7 +90,6 @@ db.exec(`
lat REAL,
lng REAL
);
CREATE TABLE IF NOT EXISTS key_location (
id INTEGER PRIMARY KEY AUTOINCREMENT,
side TEXT NOT NULL CHECK (side IN ('us', 'iran')),
@@ -55,7 +99,6 @@ db.exec(`
type TEXT,
region TEXT
);
CREATE TABLE IF NOT EXISTS combat_losses (
side TEXT PRIMARY KEY CHECK (side IN ('us', 'iran')),
bases_destroyed INTEGER NOT NULL,
@@ -67,24 +110,20 @@ db.exec(`
armor INTEGER NOT NULL,
vehicles INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS wall_street_trend (
id INTEGER PRIMARY KEY AUTOINCREMENT,
time TEXT NOT NULL,
value INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS retaliation_current (
id INTEGER PRIMARY KEY CHECK (id = 1),
value INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS retaliation_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
time TEXT NOT NULL,
value INTEGER NOT NULL
);
CREATE TABLE IF NOT EXISTS situation_update (
id TEXT PRIMARY KEY,
timestamp TEXT NOT NULL,
@@ -92,7 +131,6 @@ db.exec(`
summary TEXT NOT NULL,
severity TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS gdelt_events (
event_id TEXT PRIMARY KEY,
event_time TEXT NOT NULL,
@@ -103,7 +141,6 @@ db.exec(`
url TEXT,
created_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS conflict_stats (
id INTEGER PRIMARY KEY CHECK (id = 1),
total_events INTEGER NOT NULL DEFAULT 0,
@@ -112,7 +149,6 @@ db.exec(`
estimated_strike_count INTEGER NOT NULL DEFAULT 0,
updated_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS news_content (
id TEXT PRIMARY KEY,
content_hash TEXT NOT NULL UNIQUE,
@@ -125,57 +161,49 @@ db.exec(`
severity TEXT NOT NULL DEFAULT 'medium',
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_news_content_hash ON news_content(content_hash);
CREATE INDEX IF NOT EXISTS idx_news_content_published ON news_content(published_at DESC);
`)
`)
try { exec('CREATE INDEX IF NOT EXISTS idx_news_content_hash ON news_content(content_hash)') } catch (_) {}
try { exec('CREATE INDEX IF NOT EXISTS idx_news_content_published ON news_content(published_at DESC)') } catch (_) {}
// 迁移:为已有 key_location 表添加 type、region、status、damage_level 列
try {
const cols = db.prepare('PRAGMA table_info(key_location)').all()
const names = cols.map((c) => c.name)
if (!names.includes('type')) db.exec('ALTER TABLE key_location ADD COLUMN type TEXT')
if (!names.includes('region')) db.exec('ALTER TABLE key_location ADD COLUMN region TEXT')
if (!names.includes('status')) db.exec('ALTER TABLE key_location ADD COLUMN status TEXT DEFAULT "operational"')
if (!names.includes('damage_level')) db.exec('ALTER TABLE key_location ADD COLUMN damage_level INTEGER')
} catch (_) {}
// 迁移combat_losses 添加平民伤亡、updated_at
try {
const lossCols = db.prepare('PRAGMA table_info(combat_losses)').all()
const lossNames = lossCols.map((c) => c.name)
if (!lossNames.includes('civilian_killed')) db.exec('ALTER TABLE combat_losses ADD COLUMN civilian_killed INTEGER NOT NULL DEFAULT 0')
if (!lossNames.includes('civilian_wounded')) db.exec('ALTER TABLE combat_losses ADD COLUMN civilian_wounded INTEGER NOT NULL DEFAULT 0')
if (!lossNames.includes('updated_at')) db.exec('ALTER TABLE combat_losses ADD COLUMN updated_at TEXT DEFAULT (datetime("now"))')
if (!lossNames.includes('drones')) db.exec('ALTER TABLE combat_losses ADD COLUMN drones INTEGER NOT NULL DEFAULT 0')
if (!lossNames.includes('missiles')) db.exec('ALTER TABLE combat_losses ADD COLUMN missiles INTEGER NOT NULL DEFAULT 0')
if (!lossNames.includes('helicopters')) db.exec('ALTER TABLE combat_losses ADD COLUMN helicopters INTEGER NOT NULL DEFAULT 0')
if (!lossNames.includes('submarines')) db.exec('ALTER TABLE combat_losses ADD COLUMN submarines INTEGER NOT NULL DEFAULT 0')
if (!lossNames.includes('tanks')) db.exec('ALTER TABLE combat_losses ADD COLUMN tanks INTEGER NOT NULL DEFAULT 0')
if (!lossNames.includes('carriers')) {
db.exec('ALTER TABLE combat_losses ADD COLUMN carriers INTEGER NOT NULL DEFAULT 0')
db.exec('UPDATE combat_losses SET carriers = tanks')
}
if (!lossNames.includes('civilian_ships')) db.exec('ALTER TABLE combat_losses ADD COLUMN civilian_ships INTEGER NOT NULL DEFAULT 0')
if (!lossNames.includes('airport_port')) db.exec('ALTER TABLE combat_losses ADD COLUMN airport_port INTEGER NOT NULL DEFAULT 0')
} catch (_) {}
// 迁移:所有表添加 updated_at 用于数据回放
const addUpdatedAt = (table) => {
try {
const cols = db.prepare(`PRAGMA table_info(${table})`).all()
if (!cols.some((c) => c.name === 'updated_at')) {
db.exec(`ALTER TABLE ${table} ADD COLUMN updated_at TEXT DEFAULT (datetime("now"))`)
}
const cols = prepare('PRAGMA table_info(key_location)').all()
const names = cols.map((c) => c.name)
if (!names.includes('type')) exec('ALTER TABLE key_location ADD COLUMN type TEXT')
if (!names.includes('region')) exec('ALTER TABLE key_location ADD COLUMN region TEXT')
if (!names.includes('status')) exec('ALTER TABLE key_location ADD COLUMN status TEXT DEFAULT "operational"')
if (!names.includes('damage_level')) exec('ALTER TABLE key_location ADD COLUMN damage_level INTEGER')
} catch (_) {}
try {
const lossCols = prepare('PRAGMA table_info(combat_losses)').all()
const lossNames = lossCols.map((c) => c.name)
if (!lossNames.includes('civilian_killed')) exec('ALTER TABLE combat_losses ADD COLUMN civilian_killed INTEGER NOT NULL DEFAULT 0')
if (!lossNames.includes('civilian_wounded')) exec('ALTER TABLE combat_losses ADD COLUMN civilian_wounded INTEGER NOT NULL DEFAULT 0')
if (!lossNames.includes('updated_at')) exec('ALTER TABLE combat_losses ADD COLUMN updated_at TEXT DEFAULT (datetime("now"))')
if (!lossNames.includes('drones')) exec('ALTER TABLE combat_losses ADD COLUMN drones INTEGER NOT NULL DEFAULT 0')
if (!lossNames.includes('missiles')) exec('ALTER TABLE combat_losses ADD COLUMN missiles INTEGER NOT NULL DEFAULT 0')
if (!lossNames.includes('helicopters')) exec('ALTER TABLE combat_losses ADD COLUMN helicopters INTEGER NOT NULL DEFAULT 0')
if (!lossNames.includes('submarines')) exec('ALTER TABLE combat_losses ADD COLUMN submarines INTEGER NOT NULL DEFAULT 0')
if (!lossNames.includes('tanks')) exec('ALTER TABLE combat_losses ADD COLUMN tanks INTEGER NOT NULL DEFAULT 0')
if (!lossNames.includes('carriers')) {
exec('ALTER TABLE combat_losses ADD COLUMN carriers INTEGER NOT NULL DEFAULT 0')
exec('UPDATE combat_losses SET carriers = tanks')
}
if (!lossNames.includes('civilian_ships')) exec('ALTER TABLE combat_losses ADD COLUMN civilian_ships INTEGER NOT NULL DEFAULT 0')
if (!lossNames.includes('airport_port')) exec('ALTER TABLE combat_losses ADD COLUMN airport_port INTEGER NOT NULL DEFAULT 0')
} catch (_) {}
}
addUpdatedAt('force_summary')
addUpdatedAt('power_index')
addUpdatedAt('force_asset')
addUpdatedAt('key_location')
addUpdatedAt('retaliation_current')
// 来访统计visits 用于在看(近期活跃 IPvisitor_count 用于累积人次(每次接入 +1
try {
db.exec(`
const addUpdatedAt = (table) => {
try {
const cols = prepare(`PRAGMA table_info(${table})`).all()
if (!cols.some((c) => c.name === 'updated_at')) {
exec(`ALTER TABLE ${table} ADD COLUMN updated_at TEXT DEFAULT (datetime("now"))`)
}
} catch (_) {}
}
;['force_summary', 'power_index', 'force_asset', 'key_location', 'retaliation_current'].forEach(addUpdatedAt)
try {
exec(`
CREATE TABLE IF NOT EXISTS visits (
ip TEXT PRIMARY KEY,
last_seen TEXT NOT NULL DEFAULT (datetime('now'))
@@ -185,30 +213,66 @@ try {
total INTEGER NOT NULL DEFAULT 0
);
INSERT OR IGNORE INTO visitor_count (id, total) VALUES (1, 0);
`)
} catch (_) {}
// 后台留言:供开发者收集用户反馈
try {
db.exec(`
`)
} catch (_) {}
try {
exec(`
CREATE TABLE IF NOT EXISTS feedback (
id INTEGER PRIMARY KEY AUTOINCREMENT,
content TEXT NOT NULL,
ip TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now'))
)
`)
} catch (_) {}
// 分享次数:累计分享次数
try {
db.exec(`
`)
} catch (_) {}
try {
exec(`
CREATE TABLE IF NOT EXISTS share_count (
id INTEGER PRIMARY KEY CHECK (id = 1),
total INTEGER NOT NULL DEFAULT 0
);
INSERT OR IGNORE INTO share_count (id, total) VALUES (1, 0);
`)
} catch (_) {}
`)
} catch (_) {}
}
module.exports = db
async function initDb() {
const initSqlJs = require('sql.js')
const SQL = await initSqlJs()
let data = new Uint8Array(0)
if (fs.existsSync(dbPath)) {
data = new Uint8Array(fs.readFileSync(dbPath))
}
const nativeDb = new SQL.Database(data)
function persist() {
try {
const buf = nativeDb.export()
fs.writeFileSync(dbPath, Buffer.from(buf))
} catch (e) {
console.error('[db] persist error:', e.message)
}
}
nativeDb.run('PRAGMA journal_mode = WAL')
const wrapped = wrapDatabase(nativeDb, persist)
runMigrations(wrapped)
_db = wrapped
return _db
}
const proxy = {
prepare(sql) {
return getDb().prepare(sql)
},
exec(sql) {
return getDb().exec(sql)
},
pragma(str) {
getDb().pragma(str)
},
}
module.exports = proxy
module.exports.initDb = initDb
module.exports.getDb = getDb

View File

@@ -4,6 +4,7 @@ const fs = require('fs')
const express = require('express')
const cors = require('cors')
const { WebSocketServer } = require('ws')
const db = require('./db')
const routes = require('./routes')
const { getSituation } = require('./situationData')
@@ -67,7 +68,12 @@ function notifyCrawlerUpdate() {
} catch (_) {}
}
server.listen(PORT, () => {
console.log(`API + WebSocket running at http://localhost:${PORT}`)
console.log(`Swagger docs at http://localhost:${PORT}/api-docs`)
db.initDb().then(() => {
server.listen(PORT, () => {
console.log(`API + WebSocket running at http://localhost:${PORT}`)
console.log(`Swagger docs at http://localhost:${PORT}/api-docs`)
})
}).catch((err) => {
console.error('DB init failed:', err)
process.exit(1)
})

View File

@@ -186,4 +186,7 @@ function seed() {
console.log('Seed completed.')
}
seed()
require('./db').initDb().then(() => seed()).catch((err) => {
console.error('Seed failed:', err)
process.exit(1)
})

View File

@@ -155,6 +155,8 @@ function getSituation() {
})),
conflictStats,
civilianCasualtiesTotal,
// 顶层聚合,便于 sit.combatLosses.us / sit.combatLosses.iran 与 usForces/iranForces 内保持一致
combatLosses: { us: usLosses, iran: irLosses },
}
}