Files
AIcreat/app/services/im.py
2026-04-01 14:21:10 +08:00

55 lines
1.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import hashlib
import hmac
import base64
import time
from urllib.parse import quote_plus
import httpx
from app.config import settings
from app.schemas import IMPublishRequest, PublishResponse
class IMPublisher:
async def publish(self, req: IMPublishRequest) -> PublishResponse:
if not settings.im_webhook_url:
return PublishResponse(ok=False, detail="缺少 IM_WEBHOOK_URL 配置")
webhook = self._with_signature(settings.im_webhook_url, settings.im_secret)
payload = {
"msg_type": "post",
"content": {
"post": {
"zh_cn": {
"title": req.title,
"content": [[{"tag": "text", "text": req.body_markdown[:3800]}]],
}
}
},
}
async with httpx.AsyncClient(timeout=20) as client:
r = await client.post(webhook, json=payload)
try:
data = r.json()
except Exception:
data = {"status_code": r.status_code, "text": r.text}
if r.status_code >= 400:
return PublishResponse(ok=False, detail=f"IM 推送失败: {data}", data=data)
return PublishResponse(ok=True, detail="IM 推送成功", data=data)
def _with_signature(self, webhook: str, secret: str | None) -> str:
# 兼容飞书 webhook 签名参数timestamp/sign
if not secret:
return webhook
ts = str(int(time.time()))
string_to_sign = f"{ts}\n{secret}".encode("utf-8")
sign = base64.b64encode(hmac.new(string_to_sign, b"", hashlib.sha256).digest()).decode("utf-8")
connector = "&" if "?" in webhook else "?"
return f"{webhook}{connector}timestamp={ts}&sign={quote_plus(sign)}"