This commit is contained in:
Daniel
2026-04-28 11:50:55 +08:00
parent 1bbabc2a78
commit 2724e69b4f
20 changed files with 3881 additions and 554 deletions

View File

@@ -0,0 +1,473 @@
from __future__ import annotations
import asyncio
import base64
import logging
import re
import textwrap
from io import BytesIO
from pathlib import Path
import httpx
from openai import OpenAI
from PIL import Image, ImageDraw, ImageFont
from app.config import settings
from app.schemas import (
CoverGenerateResponse,
PosterGenerateRequest,
PosterGenerateResponse,
PosterPreviewItem,
WechatCoverGenerateRequest,
)
from app.services.wechat import WechatPublisher
logger = logging.getLogger(__name__)
_FONT_CANDIDATES = [
"/System/Library/Fonts/PingFang.ttc",
"/System/Library/Fonts/Hiragino Sans GB.ttc",
"/Library/Fonts/Arial Unicode.ttf",
]
def _split_paragraphs(body_markdown: str) -> list[str]:
raw = (body_markdown or "").replace("\r\n", "\n").strip()
if not raw:
return []
return [p.strip() for p in re.split(r"\n\s*\n+", raw) if p.strip()]
def _pick_font(size: int) -> ImageFont.ImageFont:
for p in _FONT_CANDIDATES:
if Path(p).is_file():
try:
return ImageFont.truetype(p, size=size)
except Exception:
continue
return ImageFont.load_default()
def _to_jpeg_under_limit(content: bytes, max_bytes: int) -> bytes:
im = Image.open(BytesIO(content)).convert("RGB")
widths = [1080, 1024, 960, 900, 840, 780, 720, 660]
qualities = [88, 82, 76, 70, 64, 58, 52]
for w in widths:
if im.width > w:
h = max(1, int(im.height * (w / im.width)))
cur = im.resize((w, h), Image.Resampling.LANCZOS)
else:
cur = im
for q in qualities:
buf = BytesIO()
cur.save(buf, format="JPEG", quality=q, optimize=True)
out = buf.getvalue()
if len(out) <= max_bytes:
return out
buf = BytesIO()
h = max(1, int(im.height * (640 / im.width)))
im.resize((640, h), Image.Resampling.LANCZOS).save(buf, format="JPEG", quality=48, optimize=True)
return buf.getvalue()
def _cover_to_jpeg(
content: bytes,
max_bytes: int,
size: tuple[int, int] = (900, 383),
title: str = "",
summary: str = "",
overlay_title: bool = False,
) -> bytes:
im = Image.open(BytesIO(content)).convert("RGB")
target_w, target_h = size
src_ratio = im.width / max(1, im.height)
dst_ratio = target_w / target_h
if src_ratio > dst_ratio:
new_w = int(im.height * dst_ratio)
x0 = max(0, (im.width - new_w) // 2)
im = im.crop((x0, 0, x0 + new_w, im.height))
elif src_ratio < dst_ratio:
new_h = int(im.width / dst_ratio)
y0 = max(0, (im.height - new_h) // 2)
im = im.crop((0, y0, im.width, y0 + new_h))
im = im.resize(size, Image.Resampling.LANCZOS)
if overlay_title:
im = _draw_cover_text_overlay(im, title, summary)
for q in [92, 88, 84, 80, 76, 72, 68, 62]:
buf = BytesIO()
im.save(buf, format="JPEG", quality=q, optimize=True, progressive=True)
out = buf.getvalue()
if len(out) <= max_bytes:
return out
buf = BytesIO()
im.save(buf, format="JPEG", quality=58, optimize=True)
return buf.getvalue()
def _draw_cover_text_overlay(im: Image.Image, title: str, summary: str) -> Image.Image:
im = im.convert("RGBA")
overlay = Image.new("RGBA", im.size, (0, 0, 0, 0))
draw = ImageDraw.Draw(overlay)
draw.rounded_rectangle([42, 46, 572, 337], radius=30, fill=(255, 255, 255, 228))
draw.rounded_rectangle([70, 74, 222, 119], radius=18, fill=(31, 111, 91, 245))
tag_font = _pick_font(26)
title_font = _pick_font(46)
summary_font = _pick_font(24)
draw.text((94, 83), "公众号封面", font=tag_font, fill=(255, 255, 255, 255))
clean_title = re.sub(r"\s+", "", title or "公众号文章")
title_lines = textwrap.wrap(clean_title, width=12)[:2]
y = 146
for line in title_lines:
draw.text((72, y), line, font=title_font, fill=(23, 32, 51, 255))
y += 56
clean_summary = re.sub(r"\s+", "", summary or "一眼看懂主题,明确文章价值。")
summary_lines = textwrap.wrap(clean_summary, width=23)[:2]
sy = max(y + 10, 270)
for line in summary_lines:
draw.text((74, sy), line, font=summary_font, fill=(104, 115, 133, 255))
sy += 30
return Image.alpha_composite(im, overlay).convert("RGB")
class PosterMaterialService:
def __init__(self, wechat: WechatPublisher) -> None:
self._wechat = wechat
self._image_client = None
if settings.openai_api_key:
self._image_client = OpenAI(
api_key=settings.openai_api_key,
base_url=settings.openai_base_url,
timeout=settings.openai_timeout,
max_retries=max(0, int(settings.openai_max_retries)),
)
async def generate(
self,
req: PosterGenerateRequest,
request_id: str = "",
account: dict | None = None,
) -> PosterGenerateResponse:
rid = request_id or "-"
paragraphs = _split_paragraphs(req.body_markdown)
if len(paragraphs) <= 1:
return PosterGenerateResponse(
ok=True,
detail="正文不足两段:按规则首段不生成图片,因此无需海报。",
posters=[],
body_markdown_with_posters=req.body_markdown,
warnings=[],
)
max_images = max(1, min(int(req.max_images or settings.poster_max_images), 12))
posters: list[PosterPreviewItem] = []
warnings: list[str] = []
wechat_urls_by_para: dict[int, str] = {}
for idx, paragraph in enumerate(paragraphs):
if idx == 0:
continue
if len(posters) >= max_images:
break
prompt = self._build_prompt(req, paragraph, idx, len(paragraphs))
jpeg_bytes, note = await asyncio.to_thread(self._create_poster_jpeg, prompt, paragraph, idx)
preview_data_url = "data:image/jpeg;base64," + base64.b64encode(jpeg_bytes).decode("ascii")
wechat_url = ""
uploaded = False
if req.upload_to_wechat:
if not account:
warnings.append("未绑定公众号:已生成海报预览,但未上传微信素材 URL。")
else:
filename = f"poster_p{idx + 1}.jpg"
out = await self._wechat.upload_article_image(
filename,
jpeg_bytes,
request_id=rid,
account=account,
)
if out.ok:
wechat_url = ((out.data or {}).get("url") or "").strip()
uploaded = bool(wechat_url)
if uploaded:
wechat_urls_by_para[idx] = wechat_url
else:
warnings.append(f"{idx + 1} 段海报上传失败:{out.detail}")
posters.append(
PosterPreviewItem(
paragraph_index=idx,
paragraph_excerpt=textwrap.shorten(paragraph.replace("\n", " "), width=80, placeholder=""),
prompt=prompt,
preview_data_url=preview_data_url,
wechat_url=wechat_url,
uploaded=uploaded,
note=note,
)
)
merged = req.body_markdown
if wechat_urls_by_para:
merged = self._merge_body_with_posters(paragraphs, wechat_urls_by_para)
detail = f"已生成 {len(posters)} 张段落海报(首段跳过)"
if req.upload_to_wechat:
detail += f",成功上传 {sum(1 for p in posters if p.uploaded)}"
logger.info(
"poster_generate rid=%s posters=%d upload_to_wechat=%s uploaded=%d warnings=%d",
rid,
len(posters),
req.upload_to_wechat,
sum(1 for p in posters if p.uploaded),
len(warnings),
)
return PosterGenerateResponse(
ok=True,
detail=detail,
posters=posters,
body_markdown_with_posters=merged,
warnings=warnings,
)
async def generate_cover(
self,
req: WechatCoverGenerateRequest,
request_id: str = "",
account: dict | None = None,
) -> CoverGenerateResponse:
rid = request_id or "-"
title = (req.title or "").strip()
summary = (req.summary or "").strip()
if not title:
return CoverGenerateResponse(ok=False, detail="请先填写标题,或先完成改写生成标题")
prompt = self._build_cover_prompt(req)
jpeg_bytes, note = await asyncio.to_thread(self._create_cover_jpeg, prompt, title, summary)
preview_data_url = "data:image/jpeg;base64," + base64.b64encode(jpeg_bytes).decode("ascii")
warnings: list[str] = []
thumb_media_id = ""
if req.upload_to_wechat:
if not account:
warnings.append("未绑定公众号:已生成封面预览,但未上传为微信封面素材。")
else:
out = await self._wechat.upload_cover("wechat_cover_900x383.jpg", jpeg_bytes, request_id=rid, account=account)
if out.ok:
thumb_media_id = ((out.data or {}).get("thumb_media_id") or "").strip()
else:
warnings.append(f"封面上传失败:{out.detail}")
detail = "已生成公众号封面900×383"
if thumb_media_id:
detail += ",并已绑定 thumb_media_id"
elif warnings:
detail += ",但未完成微信绑定"
logger.info(
"cover_generate rid=%s title_chars=%d upload_to_wechat=%s uploaded=%s note=%s warnings=%d",
rid,
len(title),
req.upload_to_wechat,
bool(thumb_media_id),
note,
len(warnings),
)
return CoverGenerateResponse(
ok=True,
detail=detail,
preview_data_url=preview_data_url,
thumb_media_id=thumb_media_id,
width=900,
height=383,
note=note,
warnings=warnings,
)
def _build_cover_prompt(self, req: WechatCoverGenerateRequest) -> str:
title = (req.title or "公众号文章").strip()
summary = (req.summary or "").strip()
style_hint = (req.style_hint or "").strip() or "成熟公众号封面,清晰、克制、信息强,适合作为文章列表首图"
return (
"生成一张微信公众号文章封面图,最终会裁切为 900x383 横版比例。"
f"封面主标题:{title}"
f"文章摘要:{summary}"
f"风格要求:{style_hint}"
"画面要突出封面的点击引导作用:主题明确、视觉焦点强、留出标题安全区、中文字少且清晰。"
"不要出现二维码、水印、品牌 logo、真人肖像、杂乱小字和侵权素材。"
)
def _build_prompt(self, req: PosterGenerateRequest, paragraph: str, idx: int, total: int) -> str:
title = (req.title or "公众号内容").strip()
summary = (req.summary or "").strip()
style_hint = (req.style_hint or "").strip() or "现代、干净、中文可读、公众号海报风格"
para = paragraph.strip()
return (
"请生成一张中文竖版海报,适合公众号正文插图。"
f"主题标题:{title}"
f"这是第 {idx + 1}/{total} 段对应海报(首段不配图)。"
f"段落核心内容:{para}"
f"摘要参考:{summary}"
f"风格要求:{style_hint}"
"画面需信息聚焦、可读性强不要出现水印、二维码、logo、真人肖像。"
)
def _create_poster_jpeg(self, prompt: str, paragraph: str, idx: int) -> tuple[bytes, str]:
max_bytes = max(300_000, int(settings.poster_upload_max_bytes or 950_000))
if self._image_client:
try:
raw = self._generate_with_model(prompt)
if raw:
return _to_jpeg_under_limit(raw, max_bytes), "ai"
except Exception as exc:
logger.warning("poster_ai_failed detail=%s", str(exc)[:240])
fallback = self._generate_fallback_poster(paragraph, idx)
return _to_jpeg_under_limit(fallback, max_bytes), "fallback"
def _create_cover_jpeg(self, prompt: str, title: str, summary: str) -> tuple[bytes, str]:
max_bytes = max(300_000, int(settings.poster_upload_max_bytes or 950_000))
if self._image_client:
try:
raw = self._generate_with_model(prompt)
if raw:
return _cover_to_jpeg(raw, max_bytes, title=title, summary=summary, overlay_title=True), "ai_900x383"
except Exception as exc:
logger.warning("cover_ai_failed detail=%s", str(exc)[:240])
fallback = self._generate_fallback_cover(title, summary)
return _cover_to_jpeg(fallback, max_bytes), "fallback_900x383"
def _generate_with_model(self, prompt: str) -> bytes | None:
rsp = self._image_client.images.generate(
model=settings.openai_image_model,
prompt=prompt,
size=settings.poster_image_size,
)
data = getattr(rsp, "data", None) or []
if not data:
return None
first = data[0]
b64 = ""
image_url = ""
if isinstance(first, dict):
b64 = (first.get("b64_json") or "").strip()
image_url = (first.get("url") or "").strip()
else:
b64 = (getattr(first, "b64_json", "") or "").strip()
image_url = (getattr(first, "url", "") or "").strip()
if b64:
return base64.b64decode(b64)
if image_url:
with httpx.Client(timeout=30) as client:
r = client.get(image_url)
r.raise_for_status()
return r.content
return None
def _generate_fallback_poster(self, paragraph: str, idx: int) -> bytes:
w, h = 1080, 1520
im = Image.new("RGB", (w, h), (240, 246, 255))
draw = ImageDraw.Draw(im)
for y in range(h):
c = int(240 - (y / h) * 36)
draw.line([(0, y), (w, y)], fill=(c, c + 6, 255), width=1)
for i in range(8):
x0 = int(w * 0.08) + i * 54
y0 = int(h * 0.66) + i * 22
x1 = x0 + 260
y1 = y0 + 100
color = (160 - i * 8, 190 - i * 9, 230 - i * 8)
draw.rounded_rectangle([x0, y0, x1, y1], radius=24, outline=color, width=2)
tag_font = _pick_font(36)
title_font = _pick_font(58)
body_font = _pick_font(42)
draw.rounded_rectangle([70, 70, 340, 142], radius=20, fill=(31, 77, 185))
draw.text((102, 90), f"段落 {idx + 1}", font=tag_font, fill=(255, 255, 255))
draw.text((70, 190), "AI 图文海报", font=title_font, fill=(16, 42, 102))
words = re.sub(r"\s+", "", paragraph)
if len(words) > 120:
words = words[:120] + ""
wrapped = textwrap.fill(words, width=19)
draw.multiline_text(
(72, 330),
wrapped,
font=body_font,
fill=(35, 54, 92),
spacing=14,
align="left",
)
buf = BytesIO()
im.save(buf, format="PNG")
return buf.getvalue()
def _generate_fallback_cover(self, title: str, summary: str) -> bytes:
w, h = 900, 383
im = Image.new("RGB", (w, h), (247, 249, 252))
draw = ImageDraw.Draw(im)
for y in range(h):
t = y / h
r = int(252 - t * 28)
g = int(250 - t * 18)
b = int(241 - t * 8)
draw.line([(0, y), (w, y)], fill=(r, g, b), width=1)
draw.rounded_rectangle([36, 34, 864, 349], radius=34, fill=(255, 255, 255), outline=(223, 229, 238), width=2)
draw.rounded_rectangle([604, 60, 830, 290], radius=34, fill=(238, 244, 241))
draw.ellipse([660, 95, 810, 245], fill=(229, 196, 122))
draw.ellipse([690, 126, 780, 216], fill=(255, 250, 229))
draw.arc([684, 118, 784, 226], start=20, end=168, fill=(199, 159, 81), width=5)
tag_font = _pick_font(28)
title_font = _pick_font(48)
summary_font = _pick_font(24)
small_font = _pick_font(20)
draw.rounded_rectangle([72, 70, 226, 118], radius=18, fill=(31, 111, 91))
draw.text((96, 80), "公众号封面", font=tag_font, fill=(255, 255, 255))
clean_title = re.sub(r"\s+", "", title or "公众号文章")
title_lines = textwrap.wrap(clean_title, width=12)[:2]
y = 146
for line in title_lines:
draw.text((72, y), line, font=title_font, fill=(23, 32, 51))
y += 58
clean_summary = re.sub(r"\s+", "", summary or "清晰表达主题,让读者一眼知道文章价值。")
summary_lines = textwrap.wrap(clean_summary, width=24)[:2]
sy = max(y + 12, 268)
for line in summary_lines:
draw.text((74, sy), line, font=summary_font, fill=(104, 115, 133))
sy += 32
draw.text((72, 320), "900 x 383", font=small_font, fill=(140, 150, 166))
buf = BytesIO()
im.save(buf, format="PNG")
return buf.getvalue()
def _merge_body_with_posters(self, paragraphs: list[str], wechat_urls_by_para: dict[int, str]) -> str:
merged: list[str] = []
for idx, para in enumerate(paragraphs):
if idx > 0:
url = (wechat_urls_by_para.get(idx) or "").strip()
if url:
merged.append(f"![段落配图 {idx + 1}]({url})")
merged.append(para)
return "\n\n".join(merged)