from __future__ import annotations import logging import time from io import BytesIO from pathlib import Path import httpx import markdown2 from app.config import settings from app.schemas import PublishResponse, WechatPublishRequest logger = logging.getLogger(__name__) _default_cover_jpeg_bytes: bytes | None = None def _build_default_cover_jpeg() -> tuple[bytes, str]: """生成简单灰底封面(360×200),满足微信对缩略图尺寸的常规要求。""" global _default_cover_jpeg_bytes if _default_cover_jpeg_bytes is None: from PIL import Image im = Image.new("RGB", (360, 200), (236, 240, 241)) buf = BytesIO() im.save(buf, format="JPEG", quality=88) _default_cover_jpeg_bytes = buf.getvalue() return _default_cover_jpeg_bytes, "cover_default.jpg" def _detail_for_token_error(data: dict | None) -> str: """把微信返回的 errcode 转成可操作的说明。""" if not data: return "获取微信 access_token 失败(无返回内容)" code = data.get("errcode") msg = (data.get("errmsg") or "").strip() if code == 40164: return ( "微信 errcode=40164:当前请求使用的出口 IP 未在公众号「IP 白名单」中。" "请到 微信公众平台 → 设置与开发 → 基本配置 → IP 白名单,添加本服务对外的公网 IP" "(日志里 invalid ip 后面的地址)。若在本地/Docker 调试,出口 IP 常会变,需填当前出口或改用固定出口的服务器。" f" 微信原文:{msg}" ) if code == 40013: return f"微信 errcode=40013:AppSecret 无效或已重置,请检查 WECHAT_SECRET。{msg}" if code == 40125: return f"微信 errcode=40125:AppSecret 配置错误。{msg}" return f"获取微信 access_token 失败:errcode={code} errmsg={msg}" def _detail_for_draft_error(data: dict) -> str: code = data.get("errcode") msg = (data.get("errmsg") or "").strip() if code == 40007: return ( "微信 errcode=40007(invalid media_id):thumb_media_id 缺失、不是「永久图片素材」、或已失效。" "请核对 WECHAT_THUMB_MEDIA_ID 是否从素材管理里复制的永久素材;若不确定,可删掉该变量," "由服务自动上传封面(WECHAT_THUMB_IMAGE_PATH 或内置默认图)。" f" 微信原文:{msg}" ) return f"微信草稿失败:errcode={code} errmsg={msg}" class WechatPublisher: def __init__(self) -> None: self._token_cache: dict[str, dict[str, int | str]] = {} self._runtime_thumb_media_id: str | None = None def _resolve_account(self, account: dict | None = None) -> dict[str, str]: src = account or {} appid = (src.get("appid") or settings.wechat_appid or "").strip() secret = (src.get("secret") or settings.wechat_secret or "").strip() author = (src.get("author") or settings.wechat_author or "").strip() thumb_media_id = (src.get("thumb_media_id") or settings.wechat_thumb_media_id or "").strip() thumb_image_path = (src.get("thumb_image_path") or settings.wechat_thumb_image_path or "").strip() return { "appid": appid, "secret": secret, "author": author, "thumb_media_id": thumb_media_id, "thumb_image_path": thumb_image_path, } async def publish_draft( self, req: WechatPublishRequest, request_id: str = "", account: dict | None = None ) -> PublishResponse: rid = request_id or "-" acct = self._resolve_account(account) if not acct["appid"] or not acct["secret"]: logger.warning("wechat skipped rid=%s reason=missing_appid_or_secret", rid) return PublishResponse(ok=False, detail="缺少 WECHAT_APPID / WECHAT_SECRET 配置") token, token_from_cache, token_err_body = await self._get_access_token(acct["appid"], acct["secret"]) if not token: detail = _detail_for_token_error(token_err_body) logger.error("wechat access_token_unavailable rid=%s detail=%s", rid, detail[:200]) return PublishResponse(ok=False, detail=detail, data=token_err_body) logger.info( "wechat_token rid=%s cache_hit=%s", rid, token_from_cache, ) req_thumb = (req.thumb_media_id or "").strip() if req_thumb: logger.info("wechat_thumb rid=%s source=request_media_id", rid) thumb_id = req_thumb else: thumb_id = await self._resolve_thumb_media_id(token, rid, account=acct) if not thumb_id: return PublishResponse( ok=False, detail=( "无法上传封面素材(material/add_material 失败)。" "请检查公众号是否开通素材接口权限,或手动在素材库上传后配置 WECHAT_THUMB_MEDIA_ID。" ), ) html = markdown2.markdown(req.body_markdown) logger.info( "wechat_draft_build rid=%s title_chars=%d digest_chars=%d html_chars=%d", rid, len(req.title or ""), len(req.summary or ""), len(html or ""), ) # 图文 news:thumb_media_id 为必填(永久素材),否则 errcode=40007 payload = { "articles": [ { "article_type": "news", "title": req.title[:32] if len(req.title) > 32 else req.title, "author": (req.author or acct["author"] or settings.wechat_author)[:16], "digest": (req.summary or "")[:128], "content": html, "content_source_url": "", "thumb_media_id": thumb_id, "need_open_comment": 0, "only_fans_can_comment": 0, } ] } explicit_used = bool((acct.get("thumb_media_id") or "").strip()) draft_url = f"https://api.weixin.qq.com/cgi-bin/draft/add?access_token={token}" async with httpx.AsyncClient(timeout=25) as client: logger.info( "wechat_http_post rid=%s endpoint=cgi-bin/draft/add http_timeout_s=25", rid, ) r = await client.post(draft_url, json=payload) data = r.json() if data.get("errcode") == 40007 and explicit_used: logger.warning( "wechat_draft_40007_retry rid=%s hint=config_media_id_invalid_try_auto_upload", rid, ) self._runtime_thumb_media_id = None thumb_alt = await self._resolve_thumb_media_id( token, rid, force_skip_explicit=True, account=acct ) if thumb_alt: payload["articles"][0]["thumb_media_id"] = thumb_alt async with httpx.AsyncClient(timeout=25) as client: r = await client.post(draft_url, json=payload) data = r.json() if data.get("errcode", 0) != 0: logger.warning( "wechat_draft_failed rid=%s errcode=%s errmsg=%s raw=%s", rid, data.get("errcode"), data.get("errmsg"), data, ) detail = _detail_for_draft_error(data) if isinstance(data, dict) else f"微信发布失败: {data}" return PublishResponse(ok=False, detail=detail, data=data) logger.info( "wechat_draft_ok rid=%s media_id=%s", rid, data.get("media_id", data), ) return PublishResponse(ok=True, detail="已发布到公众号草稿箱", data=data) async def upload_cover( self, filename: str, content: bytes, request_id: str = "", account: dict | None = None ) -> PublishResponse: """上传封面到微信永久素材,返回 thumb_media_id。""" rid = request_id or "-" acct = self._resolve_account(account) if not acct["appid"] or not acct["secret"]: return PublishResponse(ok=False, detail="缺少 WECHAT_APPID / WECHAT_SECRET 配置") if not content: return PublishResponse(ok=False, detail="封面文件为空") token, _, token_err_body = await self._get_access_token(acct["appid"], acct["secret"]) if not token: return PublishResponse(ok=False, detail=_detail_for_token_error(token_err_body), data=token_err_body) async with httpx.AsyncClient(timeout=60) as client: material = await self._upload_permanent_image(client, token, content, filename) if not material: return PublishResponse( ok=False, detail="封面上传失败:请检查图片格式/大小,或查看日志中的 wechat_material_add_failed", ) mid = material["media_id"] self._runtime_thumb_media_id = mid logger.info("wechat_cover_upload_ok rid=%s filename=%s media_id=%s", rid, filename, mid) return PublishResponse(ok=True, detail="封面上传成功", data={"thumb_media_id": mid, "filename": filename}) async def upload_body_material( self, filename: str, content: bytes, request_id: str = "", account: dict | None = None ) -> PublishResponse: """上传正文图片到微信永久素材库,返回 media_id 与可插入正文的 URL。""" rid = request_id or "-" acct = self._resolve_account(account) if not acct["appid"] or not acct["secret"]: return PublishResponse(ok=False, detail="缺少 WECHAT_APPID / WECHAT_SECRET 配置") if not content: return PublishResponse(ok=False, detail="素材文件为空") token, _, token_err_body = await self._get_access_token(acct["appid"], acct["secret"]) if not token: return PublishResponse(ok=False, detail=_detail_for_token_error(token_err_body), data=token_err_body) async with httpx.AsyncClient(timeout=60) as client: material = await self._upload_permanent_image(client, token, content, filename) if not material: return PublishResponse( ok=False, detail="素材上传失败:请检查图片格式/大小,或查看日志中的 wechat_material_add_failed", ) logger.info( "wechat_body_material_upload_ok rid=%s filename=%s media_id=%s url=%s", rid, filename, material.get("media_id"), material.get("url"), ) return PublishResponse(ok=True, detail="素材上传成功", data=material) async def _upload_permanent_image( self, client: httpx.AsyncClient, token: str, content: bytes, filename: str ) -> dict[str, str] | None: url = f"https://api.weixin.qq.com/cgi-bin/material/add_material?access_token={token}&type=image" ctype = "image/png" if filename.lower().endswith(".png") else "image/jpeg" files = {"media": (filename, content, ctype)} r = await client.post(url, files=files) data = r.json() if data.get("errcode"): logger.warning("wechat_material_add_failed body=%s", data) return None mid = data.get("media_id") if not mid: logger.warning("wechat_material_add_no_media_id body=%s", data) return None return {"media_id": mid, "url": data.get("url") or ""} async def _resolve_thumb_media_id( self, token: str, rid: str, *, force_skip_explicit: bool = False, account: dict | None = None ) -> str | None: """draft/add 要求 thumb_media_id 为永久图片素材;优先用配置,否则上传文件或内置图。""" acct = self._resolve_account(account) explicit = (acct.get("thumb_media_id") or "").strip() if explicit and not force_skip_explicit: logger.info("wechat_thumb rid=%s source=config_media_id", rid) return explicit if self._runtime_thumb_media_id and not force_skip_explicit: logger.info("wechat_thumb rid=%s source=runtime_cache", rid) return self._runtime_thumb_media_id path = (acct.get("thumb_image_path") or "").strip() async with httpx.AsyncClient(timeout=60) as client: if path: p = Path(path) if p.is_file(): content = p.read_bytes() material = await self._upload_permanent_image(client, token, content, p.name) if material: self._runtime_thumb_media_id = material["media_id"] logger.info("wechat_thumb rid=%s source=path_upload ok=1", rid) return material["media_id"] logger.warning("wechat_thumb rid=%s source=path_upload ok=0 path=%s", rid, path) else: logger.warning("wechat_thumb rid=%s path_not_found=%s", rid, path) content, fname = _build_default_cover_jpeg() material = await self._upload_permanent_image(client, token, content, fname) if material: self._runtime_thumb_media_id = material["media_id"] logger.info("wechat_thumb rid=%s source=default_jpeg_upload ok=1", rid) return material["media_id"] logger.error("wechat_thumb rid=%s source=default_jpeg_upload ok=0", rid) return None async def _get_access_token(self, appid: str, secret: str) -> tuple[str | None, bool, dict | None]: """成功时第三项为 None;失败时为微信返回的 JSON(含 errcode/errmsg)。""" now = int(time.time()) key = appid.strip() cached = self._token_cache.get(key) if cached: token = str(cached.get("token") or "") exp = int(cached.get("expires_at") or 0) if token and now < exp - 60: return token, True, None logger.info("wechat_http_get endpoint=cgi-bin/token reason=refresh_access_token") async with httpx.AsyncClient(timeout=20) as client: r = await client.get( "https://api.weixin.qq.com/cgi-bin/token", params={ "grant_type": "client_credential", "appid": appid, "secret": secret, }, ) data = r.json() if r.content else {} token = data.get("access_token") if not token: logger.warning( "wechat_token_refresh_failed http_status=%s body=%s", r.status_code, data, ) return None, False, data if isinstance(data, dict) else None self._token_cache[key] = {"token": token, "expires_at": now + int(data.get("expires_in", 7200))} return token, False, None