111 lines
3.2 KiB
Python
111 lines
3.2 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""写入 SQLite 并确保 situation_update 表存在"""
|
|
import sqlite3
|
|
import hashlib
|
|
import os
|
|
from datetime import datetime, timezone
|
|
|
|
from config import DB_PATH
|
|
|
|
CATEGORIES = ("deployment", "alert", "intel", "diplomatic", "other")
|
|
SEVERITIES = ("low", "medium", "high", "critical")
|
|
|
|
|
|
def _ensure_table(conn: sqlite3.Connection) -> None:
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS situation_update (
|
|
id TEXT PRIMARY KEY,
|
|
timestamp TEXT NOT NULL,
|
|
category TEXT NOT NULL,
|
|
summary TEXT NOT NULL,
|
|
severity TEXT NOT NULL
|
|
)
|
|
""")
|
|
conn.commit()
|
|
|
|
|
|
def _make_id(title: str, url: str, published: str) -> str:
|
|
raw = f"{title}|{url}|{published}"
|
|
return "nw_" + hashlib.sha256(raw.encode("utf-8")).hexdigest()[:16]
|
|
|
|
|
|
def _to_utc_iso(dt: datetime) -> str:
|
|
if dt.tzinfo:
|
|
dt = dt.astimezone(timezone.utc)
|
|
return dt.strftime("%Y-%m-%dT%H:%M:%S.000Z")
|
|
|
|
|
|
def insert_update(
|
|
conn: sqlite3.Connection,
|
|
title: str,
|
|
summary: str,
|
|
url: str,
|
|
published: datetime,
|
|
category: str = "other",
|
|
severity: str = "medium",
|
|
) -> bool:
|
|
"""插入一条更新,若 id 已存在则跳过。返回是否插入了新记录。"""
|
|
_ensure_table(conn)
|
|
ts = _to_utc_iso(published)
|
|
uid = _make_id(title, url, ts)
|
|
if category not in CATEGORIES:
|
|
category = "other"
|
|
if severity not in SEVERITIES:
|
|
severity = "medium"
|
|
try:
|
|
conn.execute(
|
|
"INSERT OR IGNORE INTO situation_update (id, timestamp, category, summary, severity) VALUES (?, ?, ?, ?, ?)",
|
|
(uid, ts, category, summary[:500], severity),
|
|
)
|
|
conn.commit()
|
|
return conn.total_changes > 0
|
|
except Exception:
|
|
conn.rollback()
|
|
return False
|
|
|
|
|
|
def touch_situation_updated_at(conn: sqlite3.Connection) -> None:
|
|
"""更新 situation 表的 updated_at"""
|
|
conn.execute(
|
|
"INSERT OR REPLACE INTO situation (id, data, updated_at) VALUES (1, '{}', ?)",
|
|
(datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.000Z"),),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def write_updates(updates: list[dict]) -> int:
|
|
"""
|
|
updates: [{"title","summary","url","published","category","severity"}, ...]
|
|
返回新增条数。
|
|
"""
|
|
if not os.path.exists(DB_PATH):
|
|
return 0
|
|
conn = sqlite3.connect(DB_PATH, timeout=10)
|
|
try:
|
|
count = 0
|
|
for u in updates:
|
|
pub = u.get("published")
|
|
if isinstance(pub, str):
|
|
try:
|
|
pub = datetime.fromisoformat(pub.replace("Z", "+00:00"))
|
|
except ValueError:
|
|
pub = datetime.utcnow()
|
|
elif pub is None:
|
|
pub = datetime.utcnow()
|
|
ok = insert_update(
|
|
conn,
|
|
title=u.get("title", "")[:200],
|
|
summary=u.get("summary", "") or u.get("title", ""),
|
|
url=u.get("url", ""),
|
|
published=pub,
|
|
category=u.get("category", "other"),
|
|
severity=u.get("severity", "medium"),
|
|
)
|
|
if ok:
|
|
count += 1
|
|
if count > 0:
|
|
touch_situation_updated_at(conn)
|
|
return count
|
|
finally:
|
|
conn.close()
|