Files
AIcreat/app/services/wechat.py
2026-04-06 15:28:15 +08:00

276 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import logging
import time
from io import BytesIO
from pathlib import Path
import httpx
import markdown2
from app.config import settings
from app.schemas import PublishResponse, WechatPublishRequest
logger = logging.getLogger(__name__)
_default_cover_jpeg_bytes: bytes | None = None
def _build_default_cover_jpeg() -> tuple[bytes, str]:
"""生成简单灰底封面360×200满足微信对缩略图尺寸的常规要求。"""
global _default_cover_jpeg_bytes
if _default_cover_jpeg_bytes is None:
from PIL import Image
im = Image.new("RGB", (360, 200), (236, 240, 241))
buf = BytesIO()
im.save(buf, format="JPEG", quality=88)
_default_cover_jpeg_bytes = buf.getvalue()
return _default_cover_jpeg_bytes, "cover_default.jpg"
def _detail_for_token_error(data: dict | None) -> str:
"""把微信返回的 errcode 转成可操作的说明。"""
if not data:
return "获取微信 access_token 失败(无返回内容)"
code = data.get("errcode")
msg = (data.get("errmsg") or "").strip()
if code == 40164:
return (
"微信 errcode=40164当前请求使用的出口 IP 未在公众号「IP 白名单」中。"
"请到 微信公众平台 → 设置与开发 → 基本配置 → IP 白名单,添加本服务对外的公网 IP"
"(日志里 invalid ip 后面的地址)。若在本地/Docker 调试,出口 IP 常会变,需填当前出口或改用固定出口的服务器。"
f" 微信原文:{msg}"
)
if code == 40013:
return f"微信 errcode=40013AppSecret 无效或已重置,请检查 WECHAT_SECRET。{msg}"
if code == 40125:
return f"微信 errcode=40125AppSecret 配置错误。{msg}"
return f"获取微信 access_token 失败errcode={code} errmsg={msg}"
def _detail_for_draft_error(data: dict) -> str:
code = data.get("errcode")
msg = (data.get("errmsg") or "").strip()
if code == 40007:
return (
"微信 errcode=40007invalid media_idthumb_media_id 缺失、不是「永久图片素材」、或已失效。"
"请核对 WECHAT_THUMB_MEDIA_ID 是否从素材管理里复制的永久素材;若不确定,可删掉该变量,"
"由服务自动上传封面WECHAT_THUMB_IMAGE_PATH 或内置默认图)。"
f" 微信原文:{msg}"
)
return f"微信草稿失败errcode={code} errmsg={msg}"
class WechatPublisher:
def __init__(self) -> None:
self._access_token = None
self._expires_at = 0
self._runtime_thumb_media_id: str | None = None
async def publish_draft(self, req: WechatPublishRequest, request_id: str = "") -> PublishResponse:
rid = request_id or "-"
if not settings.wechat_appid or not settings.wechat_secret:
logger.warning("wechat skipped rid=%s reason=missing_appid_or_secret", rid)
return PublishResponse(ok=False, detail="缺少 WECHAT_APPID / WECHAT_SECRET 配置")
token, token_from_cache, token_err_body = await self._get_access_token()
if not token:
detail = _detail_for_token_error(token_err_body)
logger.error("wechat access_token_unavailable rid=%s detail=%s", rid, detail[:200])
return PublishResponse(ok=False, detail=detail, data=token_err_body)
logger.info(
"wechat_token rid=%s cache_hit=%s",
rid,
token_from_cache,
)
req_thumb = (req.thumb_media_id or "").strip()
if req_thumb:
logger.info("wechat_thumb rid=%s source=request_media_id", rid)
thumb_id = req_thumb
else:
thumb_id = await self._resolve_thumb_media_id(token, rid)
if not thumb_id:
return PublishResponse(
ok=False,
detail=(
"无法上传封面素材material/add_material 失败)。"
"请检查公众号是否开通素材接口权限,或手动在素材库上传后配置 WECHAT_THUMB_MEDIA_ID。"
),
)
html = markdown2.markdown(req.body_markdown)
logger.info(
"wechat_draft_build rid=%s title_chars=%d digest_chars=%d html_chars=%d",
rid,
len(req.title or ""),
len(req.summary or ""),
len(html or ""),
)
# 图文 newsthumb_media_id 为必填(永久素材),否则 errcode=40007
payload = {
"articles": [
{
"article_type": "news",
"title": req.title[:32] if len(req.title) > 32 else req.title,
"author": (req.author or settings.wechat_author)[:16],
"digest": (req.summary or "")[:128],
"content": html,
"content_source_url": "",
"thumb_media_id": thumb_id,
"need_open_comment": 0,
"only_fans_can_comment": 0,
}
]
}
explicit_used = bool((settings.wechat_thumb_media_id or "").strip())
draft_url = f"https://api.weixin.qq.com/cgi-bin/draft/add?access_token={token}"
async with httpx.AsyncClient(timeout=25) as client:
logger.info(
"wechat_http_post rid=%s endpoint=cgi-bin/draft/add http_timeout_s=25",
rid,
)
r = await client.post(draft_url, json=payload)
data = r.json()
if data.get("errcode") == 40007 and explicit_used:
logger.warning(
"wechat_draft_40007_retry rid=%s hint=config_media_id_invalid_try_auto_upload",
rid,
)
self._runtime_thumb_media_id = None
thumb_alt = await self._resolve_thumb_media_id(token, rid, force_skip_explicit=True)
if thumb_alt:
payload["articles"][0]["thumb_media_id"] = thumb_alt
async with httpx.AsyncClient(timeout=25) as client:
r = await client.post(draft_url, json=payload)
data = r.json()
if data.get("errcode", 0) != 0:
logger.warning(
"wechat_draft_failed rid=%s errcode=%s errmsg=%s raw=%s",
rid,
data.get("errcode"),
data.get("errmsg"),
data,
)
detail = _detail_for_draft_error(data) if isinstance(data, dict) else f"微信发布失败: {data}"
return PublishResponse(ok=False, detail=detail, data=data)
logger.info(
"wechat_draft_ok rid=%s media_id=%s",
rid,
data.get("media_id", data),
)
return PublishResponse(ok=True, detail="已发布到公众号草稿箱", data=data)
async def upload_cover(self, filename: str, content: bytes, request_id: str = "") -> PublishResponse:
"""上传封面到微信永久素材,返回 thumb_media_id。"""
rid = request_id or "-"
if not settings.wechat_appid or not settings.wechat_secret:
return PublishResponse(ok=False, detail="缺少 WECHAT_APPID / WECHAT_SECRET 配置")
if not content:
return PublishResponse(ok=False, detail="封面文件为空")
token, _, token_err_body = await self._get_access_token()
if not token:
return PublishResponse(ok=False, detail=_detail_for_token_error(token_err_body), data=token_err_body)
async with httpx.AsyncClient(timeout=60) as client:
mid = await self._upload_permanent_image(client, token, content, filename)
if not mid:
return PublishResponse(
ok=False,
detail="封面上传失败:请检查图片格式/大小,或查看日志中的 wechat_material_add_failed",
)
self._runtime_thumb_media_id = mid
logger.info("wechat_cover_upload_ok rid=%s filename=%s media_id=%s", rid, filename, mid)
return PublishResponse(ok=True, detail="封面上传成功", data={"thumb_media_id": mid, "filename": filename})
async def _upload_permanent_image(
self, client: httpx.AsyncClient, token: str, content: bytes, filename: str
) -> str | None:
url = f"https://api.weixin.qq.com/cgi-bin/material/add_material?access_token={token}&type=image"
ctype = "image/png" if filename.lower().endswith(".png") else "image/jpeg"
files = {"media": (filename, content, ctype)}
r = await client.post(url, files=files)
data = r.json()
if data.get("errcode"):
logger.warning("wechat_material_add_failed body=%s", data)
return None
mid = data.get("media_id")
if not mid:
logger.warning("wechat_material_add_no_media_id body=%s", data)
return None
return mid
async def _resolve_thumb_media_id(self, token: str, rid: str, *, force_skip_explicit: bool = False) -> str | None:
"""draft/add 要求 thumb_media_id 为永久图片素材;优先用配置,否则上传文件或内置图。"""
explicit = (settings.wechat_thumb_media_id or "").strip()
if explicit and not force_skip_explicit:
logger.info("wechat_thumb rid=%s source=config_media_id", rid)
return explicit
if self._runtime_thumb_media_id and not force_skip_explicit:
logger.info("wechat_thumb rid=%s source=runtime_cache", rid)
return self._runtime_thumb_media_id
path = (settings.wechat_thumb_image_path or "").strip()
async with httpx.AsyncClient(timeout=60) as client:
if path:
p = Path(path)
if p.is_file():
content = p.read_bytes()
mid = await self._upload_permanent_image(client, token, content, p.name)
if mid:
self._runtime_thumb_media_id = mid
logger.info("wechat_thumb rid=%s source=path_upload ok=1", rid)
return mid
logger.warning("wechat_thumb rid=%s source=path_upload ok=0 path=%s", rid, path)
else:
logger.warning("wechat_thumb rid=%s path_not_found=%s", rid, path)
content, fname = _build_default_cover_jpeg()
mid = await self._upload_permanent_image(client, token, content, fname)
if mid:
self._runtime_thumb_media_id = mid
logger.info("wechat_thumb rid=%s source=default_jpeg_upload ok=1", rid)
return mid
logger.error("wechat_thumb rid=%s source=default_jpeg_upload ok=0", rid)
return None
async def _get_access_token(self) -> tuple[str | None, bool, dict | None]:
"""成功时第三项为 None失败时为微信返回的 JSON含 errcode/errmsg"""
now = int(time.time())
if self._access_token and now < self._expires_at - 60:
return self._access_token, True, None
logger.info("wechat_http_get endpoint=cgi-bin/token reason=refresh_access_token")
async with httpx.AsyncClient(timeout=20) as client:
r = await client.get(
"https://api.weixin.qq.com/cgi-bin/token",
params={
"grant_type": "client_credential",
"appid": settings.wechat_appid,
"secret": settings.wechat_secret,
},
)
data = r.json() if r.content else {}
token = data.get("access_token")
if not token:
logger.warning(
"wechat_token_refresh_failed http_status=%s body=%s",
r.status_code,
data,
)
return None, False, data if isinstance(data, dict) else None
self._access_token = token
self._expires_at = now + int(data.get("expires_in", 7200))
return token, False, None