Files
AIcreat/app/services/wechat.py
2026-04-06 14:20:53 +08:00

136 lines
5.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import logging
import time
import httpx
import markdown2
from app.config import settings
from app.schemas import PublishResponse, WechatPublishRequest
logger = logging.getLogger(__name__)
def _detail_for_token_error(data: dict | None) -> str:
"""把微信返回的 errcode 转成可操作的说明。"""
if not data:
return "获取微信 access_token 失败(无返回内容)"
code = data.get("errcode")
msg = (data.get("errmsg") or "").strip()
if code == 40164:
return (
"微信 errcode=40164当前请求使用的出口 IP 未在公众号「IP 白名单」中。"
"请到 微信公众平台 → 设置与开发 → 基本配置 → IP 白名单,添加本服务对外的公网 IP"
"(日志里 invalid ip 后面的地址)。若在本地/Docker 调试,出口 IP 常会变,需填当前出口或改用固定出口的服务器。"
f" 微信原文:{msg}"
)
if code == 40013:
return f"微信 errcode=40013AppSecret 无效或已重置,请检查 WECHAT_SECRET。{msg}"
if code == 40125:
return f"微信 errcode=40125AppSecret 配置错误。{msg}"
return f"获取微信 access_token 失败errcode={code} errmsg={msg}"
class WechatPublisher:
def __init__(self) -> None:
self._access_token = None
self._expires_at = 0
async def publish_draft(self, req: WechatPublishRequest, request_id: str = "") -> PublishResponse:
rid = request_id or "-"
if not settings.wechat_appid or not settings.wechat_secret:
logger.warning("wechat skipped rid=%s reason=missing_appid_or_secret", rid)
return PublishResponse(ok=False, detail="缺少 WECHAT_APPID / WECHAT_SECRET 配置")
token, token_from_cache, token_err_body = await self._get_access_token()
if not token:
detail = _detail_for_token_error(token_err_body)
logger.error("wechat access_token_unavailable rid=%s detail=%s", rid, detail[:200])
return PublishResponse(ok=False, detail=detail, data=token_err_body)
logger.info(
"wechat_token rid=%s cache_hit=%s",
rid,
token_from_cache,
)
html = markdown2.markdown(req.body_markdown)
logger.info(
"wechat_draft_build rid=%s title_chars=%d digest_chars=%d html_chars=%d",
rid,
len(req.title or ""),
len(req.summary or ""),
len(html or ""),
)
payload = {
"articles": [
{
"title": req.title,
"author": req.author or settings.wechat_author,
"digest": req.summary,
"content": html,
"content_source_url": "",
"need_open_comment": 0,
"only_fans_can_comment": 0,
}
]
}
async with httpx.AsyncClient(timeout=25) as client:
url = f"https://api.weixin.qq.com/cgi-bin/draft/add?access_token={token}"
logger.info(
"wechat_http_post rid=%s endpoint=cgi-bin/draft/add http_timeout_s=25",
rid,
)
r = await client.post(url, json=payload)
data = r.json()
if data.get("errcode", 0) != 0:
logger.warning(
"wechat_draft_failed rid=%s errcode=%s errmsg=%s raw=%s",
rid,
data.get("errcode"),
data.get("errmsg"),
data,
)
return PublishResponse(ok=False, detail=f"微信发布失败: {data}", data=data)
logger.info(
"wechat_draft_ok rid=%s media_id=%s",
rid,
data.get("media_id", data),
)
return PublishResponse(ok=True, detail="已发布到公众号草稿箱", data=data)
async def _get_access_token(self) -> tuple[str | None, bool, dict | None]:
"""成功时第三项为 None失败时为微信返回的 JSON含 errcode/errmsg"""
now = int(time.time())
if self._access_token and now < self._expires_at - 60:
return self._access_token, True, None
logger.info("wechat_http_get endpoint=cgi-bin/token reason=refresh_access_token")
async with httpx.AsyncClient(timeout=20) as client:
r = await client.get(
"https://api.weixin.qq.com/cgi-bin/token",
params={
"grant_type": "client_credential",
"appid": settings.wechat_appid,
"secret": settings.wechat_secret,
},
)
data = r.json() if r.content else {}
token = data.get("access_token")
if not token:
logger.warning(
"wechat_token_refresh_failed http_status=%s body=%s",
r.status_code,
data,
)
return None, False, data if isinstance(data, dict) else None
self._access_token = token
self._expires_at = now + int(data.get("expires_in", 7200))
return token, False, None