474 lines
18 KiB
Python
474 lines
18 KiB
Python
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import base64
|
||
import logging
|
||
import re
|
||
import textwrap
|
||
from io import BytesIO
|
||
from pathlib import Path
|
||
|
||
import httpx
|
||
from openai import OpenAI
|
||
from PIL import Image, ImageDraw, ImageFont
|
||
|
||
from app.config import settings
|
||
from app.schemas import (
|
||
CoverGenerateResponse,
|
||
PosterGenerateRequest,
|
||
PosterGenerateResponse,
|
||
PosterPreviewItem,
|
||
WechatCoverGenerateRequest,
|
||
)
|
||
from app.services.wechat import WechatPublisher
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
_FONT_CANDIDATES = [
|
||
"/System/Library/Fonts/PingFang.ttc",
|
||
"/System/Library/Fonts/Hiragino Sans GB.ttc",
|
||
"/Library/Fonts/Arial Unicode.ttf",
|
||
]
|
||
|
||
|
||
def _split_paragraphs(body_markdown: str) -> list[str]:
|
||
raw = (body_markdown or "").replace("\r\n", "\n").strip()
|
||
if not raw:
|
||
return []
|
||
return [p.strip() for p in re.split(r"\n\s*\n+", raw) if p.strip()]
|
||
|
||
|
||
def _pick_font(size: int) -> ImageFont.ImageFont:
|
||
for p in _FONT_CANDIDATES:
|
||
if Path(p).is_file():
|
||
try:
|
||
return ImageFont.truetype(p, size=size)
|
||
except Exception:
|
||
continue
|
||
return ImageFont.load_default()
|
||
|
||
|
||
def _to_jpeg_under_limit(content: bytes, max_bytes: int) -> bytes:
|
||
im = Image.open(BytesIO(content)).convert("RGB")
|
||
widths = [1080, 1024, 960, 900, 840, 780, 720, 660]
|
||
qualities = [88, 82, 76, 70, 64, 58, 52]
|
||
|
||
for w in widths:
|
||
if im.width > w:
|
||
h = max(1, int(im.height * (w / im.width)))
|
||
cur = im.resize((w, h), Image.Resampling.LANCZOS)
|
||
else:
|
||
cur = im
|
||
for q in qualities:
|
||
buf = BytesIO()
|
||
cur.save(buf, format="JPEG", quality=q, optimize=True)
|
||
out = buf.getvalue()
|
||
if len(out) <= max_bytes:
|
||
return out
|
||
|
||
buf = BytesIO()
|
||
h = max(1, int(im.height * (640 / im.width)))
|
||
im.resize((640, h), Image.Resampling.LANCZOS).save(buf, format="JPEG", quality=48, optimize=True)
|
||
return buf.getvalue()
|
||
|
||
|
||
def _cover_to_jpeg(
|
||
content: bytes,
|
||
max_bytes: int,
|
||
size: tuple[int, int] = (900, 383),
|
||
title: str = "",
|
||
summary: str = "",
|
||
overlay_title: bool = False,
|
||
) -> bytes:
|
||
im = Image.open(BytesIO(content)).convert("RGB")
|
||
target_w, target_h = size
|
||
src_ratio = im.width / max(1, im.height)
|
||
dst_ratio = target_w / target_h
|
||
if src_ratio > dst_ratio:
|
||
new_w = int(im.height * dst_ratio)
|
||
x0 = max(0, (im.width - new_w) // 2)
|
||
im = im.crop((x0, 0, x0 + new_w, im.height))
|
||
elif src_ratio < dst_ratio:
|
||
new_h = int(im.width / dst_ratio)
|
||
y0 = max(0, (im.height - new_h) // 2)
|
||
im = im.crop((0, y0, im.width, y0 + new_h))
|
||
im = im.resize(size, Image.Resampling.LANCZOS)
|
||
if overlay_title:
|
||
im = _draw_cover_text_overlay(im, title, summary)
|
||
|
||
for q in [92, 88, 84, 80, 76, 72, 68, 62]:
|
||
buf = BytesIO()
|
||
im.save(buf, format="JPEG", quality=q, optimize=True, progressive=True)
|
||
out = buf.getvalue()
|
||
if len(out) <= max_bytes:
|
||
return out
|
||
buf = BytesIO()
|
||
im.save(buf, format="JPEG", quality=58, optimize=True)
|
||
return buf.getvalue()
|
||
|
||
|
||
def _draw_cover_text_overlay(im: Image.Image, title: str, summary: str) -> Image.Image:
|
||
im = im.convert("RGBA")
|
||
overlay = Image.new("RGBA", im.size, (0, 0, 0, 0))
|
||
draw = ImageDraw.Draw(overlay)
|
||
draw.rounded_rectangle([42, 46, 572, 337], radius=30, fill=(255, 255, 255, 228))
|
||
draw.rounded_rectangle([70, 74, 222, 119], radius=18, fill=(31, 111, 91, 245))
|
||
|
||
tag_font = _pick_font(26)
|
||
title_font = _pick_font(46)
|
||
summary_font = _pick_font(24)
|
||
draw.text((94, 83), "公众号封面", font=tag_font, fill=(255, 255, 255, 255))
|
||
|
||
clean_title = re.sub(r"\s+", "", title or "公众号文章")
|
||
title_lines = textwrap.wrap(clean_title, width=12)[:2]
|
||
y = 146
|
||
for line in title_lines:
|
||
draw.text((72, y), line, font=title_font, fill=(23, 32, 51, 255))
|
||
y += 56
|
||
|
||
clean_summary = re.sub(r"\s+", "", summary or "一眼看懂主题,明确文章价值。")
|
||
summary_lines = textwrap.wrap(clean_summary, width=23)[:2]
|
||
sy = max(y + 10, 270)
|
||
for line in summary_lines:
|
||
draw.text((74, sy), line, font=summary_font, fill=(104, 115, 133, 255))
|
||
sy += 30
|
||
|
||
return Image.alpha_composite(im, overlay).convert("RGB")
|
||
|
||
|
||
class PosterMaterialService:
|
||
def __init__(self, wechat: WechatPublisher) -> None:
|
||
self._wechat = wechat
|
||
self._image_client = None
|
||
if settings.openai_api_key:
|
||
self._image_client = OpenAI(
|
||
api_key=settings.openai_api_key,
|
||
base_url=settings.openai_base_url,
|
||
timeout=settings.openai_timeout,
|
||
max_retries=max(0, int(settings.openai_max_retries)),
|
||
)
|
||
|
||
async def generate(
|
||
self,
|
||
req: PosterGenerateRequest,
|
||
request_id: str = "",
|
||
account: dict | None = None,
|
||
) -> PosterGenerateResponse:
|
||
rid = request_id or "-"
|
||
paragraphs = _split_paragraphs(req.body_markdown)
|
||
if len(paragraphs) <= 1:
|
||
return PosterGenerateResponse(
|
||
ok=True,
|
||
detail="正文不足两段:按规则首段不生成图片,因此无需海报。",
|
||
posters=[],
|
||
body_markdown_with_posters=req.body_markdown,
|
||
warnings=[],
|
||
)
|
||
|
||
max_images = max(1, min(int(req.max_images or settings.poster_max_images), 12))
|
||
posters: list[PosterPreviewItem] = []
|
||
warnings: list[str] = []
|
||
wechat_urls_by_para: dict[int, str] = {}
|
||
|
||
for idx, paragraph in enumerate(paragraphs):
|
||
if idx == 0:
|
||
continue
|
||
if len(posters) >= max_images:
|
||
break
|
||
|
||
prompt = self._build_prompt(req, paragraph, idx, len(paragraphs))
|
||
jpeg_bytes, note = await asyncio.to_thread(self._create_poster_jpeg, prompt, paragraph, idx)
|
||
preview_data_url = "data:image/jpeg;base64," + base64.b64encode(jpeg_bytes).decode("ascii")
|
||
|
||
wechat_url = ""
|
||
uploaded = False
|
||
if req.upload_to_wechat:
|
||
if not account:
|
||
warnings.append("未绑定公众号:已生成海报预览,但未上传微信素材 URL。")
|
||
else:
|
||
filename = f"poster_p{idx + 1}.jpg"
|
||
out = await self._wechat.upload_article_image(
|
||
filename,
|
||
jpeg_bytes,
|
||
request_id=rid,
|
||
account=account,
|
||
)
|
||
if out.ok:
|
||
wechat_url = ((out.data or {}).get("url") or "").strip()
|
||
uploaded = bool(wechat_url)
|
||
if uploaded:
|
||
wechat_urls_by_para[idx] = wechat_url
|
||
else:
|
||
warnings.append(f"第 {idx + 1} 段海报上传失败:{out.detail}")
|
||
|
||
posters.append(
|
||
PosterPreviewItem(
|
||
paragraph_index=idx,
|
||
paragraph_excerpt=textwrap.shorten(paragraph.replace("\n", " "), width=80, placeholder="…"),
|
||
prompt=prompt,
|
||
preview_data_url=preview_data_url,
|
||
wechat_url=wechat_url,
|
||
uploaded=uploaded,
|
||
note=note,
|
||
)
|
||
)
|
||
|
||
merged = req.body_markdown
|
||
if wechat_urls_by_para:
|
||
merged = self._merge_body_with_posters(paragraphs, wechat_urls_by_para)
|
||
detail = f"已生成 {len(posters)} 张段落海报(首段跳过)"
|
||
if req.upload_to_wechat:
|
||
detail += f",成功上传 {sum(1 for p in posters if p.uploaded)} 张"
|
||
|
||
logger.info(
|
||
"poster_generate rid=%s posters=%d upload_to_wechat=%s uploaded=%d warnings=%d",
|
||
rid,
|
||
len(posters),
|
||
req.upload_to_wechat,
|
||
sum(1 for p in posters if p.uploaded),
|
||
len(warnings),
|
||
)
|
||
return PosterGenerateResponse(
|
||
ok=True,
|
||
detail=detail,
|
||
posters=posters,
|
||
body_markdown_with_posters=merged,
|
||
warnings=warnings,
|
||
)
|
||
|
||
async def generate_cover(
|
||
self,
|
||
req: WechatCoverGenerateRequest,
|
||
request_id: str = "",
|
||
account: dict | None = None,
|
||
) -> CoverGenerateResponse:
|
||
rid = request_id or "-"
|
||
title = (req.title or "").strip()
|
||
summary = (req.summary or "").strip()
|
||
if not title:
|
||
return CoverGenerateResponse(ok=False, detail="请先填写标题,或先完成改写生成标题")
|
||
|
||
prompt = self._build_cover_prompt(req)
|
||
jpeg_bytes, note = await asyncio.to_thread(self._create_cover_jpeg, prompt, title, summary)
|
||
preview_data_url = "data:image/jpeg;base64," + base64.b64encode(jpeg_bytes).decode("ascii")
|
||
|
||
warnings: list[str] = []
|
||
thumb_media_id = ""
|
||
if req.upload_to_wechat:
|
||
if not account:
|
||
warnings.append("未绑定公众号:已生成封面预览,但未上传为微信封面素材。")
|
||
else:
|
||
out = await self._wechat.upload_cover("wechat_cover_900x383.jpg", jpeg_bytes, request_id=rid, account=account)
|
||
if out.ok:
|
||
thumb_media_id = ((out.data or {}).get("thumb_media_id") or "").strip()
|
||
else:
|
||
warnings.append(f"封面上传失败:{out.detail}")
|
||
|
||
detail = "已生成公众号封面(900×383)"
|
||
if thumb_media_id:
|
||
detail += ",并已绑定 thumb_media_id"
|
||
elif warnings:
|
||
detail += ",但未完成微信绑定"
|
||
|
||
logger.info(
|
||
"cover_generate rid=%s title_chars=%d upload_to_wechat=%s uploaded=%s note=%s warnings=%d",
|
||
rid,
|
||
len(title),
|
||
req.upload_to_wechat,
|
||
bool(thumb_media_id),
|
||
note,
|
||
len(warnings),
|
||
)
|
||
return CoverGenerateResponse(
|
||
ok=True,
|
||
detail=detail,
|
||
preview_data_url=preview_data_url,
|
||
thumb_media_id=thumb_media_id,
|
||
width=900,
|
||
height=383,
|
||
note=note,
|
||
warnings=warnings,
|
||
)
|
||
|
||
def _build_cover_prompt(self, req: WechatCoverGenerateRequest) -> str:
|
||
title = (req.title or "公众号文章").strip()
|
||
summary = (req.summary or "").strip()
|
||
style_hint = (req.style_hint or "").strip() or "成熟公众号封面,清晰、克制、信息强,适合作为文章列表首图"
|
||
return (
|
||
"生成一张微信公众号文章封面图,最终会裁切为 900x383 横版比例。"
|
||
f"封面主标题:{title}。"
|
||
f"文章摘要:{summary}。"
|
||
f"风格要求:{style_hint}。"
|
||
"画面要突出封面的点击引导作用:主题明确、视觉焦点强、留出标题安全区、中文字少且清晰。"
|
||
"不要出现二维码、水印、品牌 logo、真人肖像、杂乱小字和侵权素材。"
|
||
)
|
||
|
||
def _build_prompt(self, req: PosterGenerateRequest, paragraph: str, idx: int, total: int) -> str:
|
||
title = (req.title or "公众号内容").strip()
|
||
summary = (req.summary or "").strip()
|
||
style_hint = (req.style_hint or "").strip() or "现代、干净、中文可读、公众号海报风格"
|
||
para = paragraph.strip()
|
||
return (
|
||
"请生成一张中文竖版海报,适合公众号正文插图。"
|
||
f"主题标题:{title}。"
|
||
f"这是第 {idx + 1}/{total} 段对应海报(首段不配图)。"
|
||
f"段落核心内容:{para}。"
|
||
f"摘要参考:{summary}。"
|
||
f"风格要求:{style_hint}。"
|
||
"画面需信息聚焦、可读性强,不要出现水印、二维码、logo、真人肖像。"
|
||
)
|
||
|
||
def _create_poster_jpeg(self, prompt: str, paragraph: str, idx: int) -> tuple[bytes, str]:
|
||
max_bytes = max(300_000, int(settings.poster_upload_max_bytes or 950_000))
|
||
|
||
if self._image_client:
|
||
try:
|
||
raw = self._generate_with_model(prompt)
|
||
if raw:
|
||
return _to_jpeg_under_limit(raw, max_bytes), "ai"
|
||
except Exception as exc:
|
||
logger.warning("poster_ai_failed detail=%s", str(exc)[:240])
|
||
|
||
fallback = self._generate_fallback_poster(paragraph, idx)
|
||
return _to_jpeg_under_limit(fallback, max_bytes), "fallback"
|
||
|
||
def _create_cover_jpeg(self, prompt: str, title: str, summary: str) -> tuple[bytes, str]:
|
||
max_bytes = max(300_000, int(settings.poster_upload_max_bytes or 950_000))
|
||
|
||
if self._image_client:
|
||
try:
|
||
raw = self._generate_with_model(prompt)
|
||
if raw:
|
||
return _cover_to_jpeg(raw, max_bytes, title=title, summary=summary, overlay_title=True), "ai_900x383"
|
||
except Exception as exc:
|
||
logger.warning("cover_ai_failed detail=%s", str(exc)[:240])
|
||
|
||
fallback = self._generate_fallback_cover(title, summary)
|
||
return _cover_to_jpeg(fallback, max_bytes), "fallback_900x383"
|
||
|
||
def _generate_with_model(self, prompt: str) -> bytes | None:
|
||
rsp = self._image_client.images.generate(
|
||
model=settings.openai_image_model,
|
||
prompt=prompt,
|
||
size=settings.poster_image_size,
|
||
)
|
||
data = getattr(rsp, "data", None) or []
|
||
if not data:
|
||
return None
|
||
|
||
first = data[0]
|
||
b64 = ""
|
||
image_url = ""
|
||
if isinstance(first, dict):
|
||
b64 = (first.get("b64_json") or "").strip()
|
||
image_url = (first.get("url") or "").strip()
|
||
else:
|
||
b64 = (getattr(first, "b64_json", "") or "").strip()
|
||
image_url = (getattr(first, "url", "") or "").strip()
|
||
|
||
if b64:
|
||
return base64.b64decode(b64)
|
||
if image_url:
|
||
with httpx.Client(timeout=30) as client:
|
||
r = client.get(image_url)
|
||
r.raise_for_status()
|
||
return r.content
|
||
return None
|
||
|
||
def _generate_fallback_poster(self, paragraph: str, idx: int) -> bytes:
|
||
w, h = 1080, 1520
|
||
im = Image.new("RGB", (w, h), (240, 246, 255))
|
||
draw = ImageDraw.Draw(im)
|
||
|
||
for y in range(h):
|
||
c = int(240 - (y / h) * 36)
|
||
draw.line([(0, y), (w, y)], fill=(c, c + 6, 255), width=1)
|
||
|
||
for i in range(8):
|
||
x0 = int(w * 0.08) + i * 54
|
||
y0 = int(h * 0.66) + i * 22
|
||
x1 = x0 + 260
|
||
y1 = y0 + 100
|
||
color = (160 - i * 8, 190 - i * 9, 230 - i * 8)
|
||
draw.rounded_rectangle([x0, y0, x1, y1], radius=24, outline=color, width=2)
|
||
|
||
tag_font = _pick_font(36)
|
||
title_font = _pick_font(58)
|
||
body_font = _pick_font(42)
|
||
|
||
draw.rounded_rectangle([70, 70, 340, 142], radius=20, fill=(31, 77, 185))
|
||
draw.text((102, 90), f"段落 {idx + 1}", font=tag_font, fill=(255, 255, 255))
|
||
draw.text((70, 190), "AI 图文海报", font=title_font, fill=(16, 42, 102))
|
||
|
||
words = re.sub(r"\s+", "", paragraph)
|
||
if len(words) > 120:
|
||
words = words[:120] + "…"
|
||
wrapped = textwrap.fill(words, width=19)
|
||
draw.multiline_text(
|
||
(72, 330),
|
||
wrapped,
|
||
font=body_font,
|
||
fill=(35, 54, 92),
|
||
spacing=14,
|
||
align="left",
|
||
)
|
||
|
||
buf = BytesIO()
|
||
im.save(buf, format="PNG")
|
||
return buf.getvalue()
|
||
|
||
def _generate_fallback_cover(self, title: str, summary: str) -> bytes:
|
||
w, h = 900, 383
|
||
im = Image.new("RGB", (w, h), (247, 249, 252))
|
||
draw = ImageDraw.Draw(im)
|
||
|
||
for y in range(h):
|
||
t = y / h
|
||
r = int(252 - t * 28)
|
||
g = int(250 - t * 18)
|
||
b = int(241 - t * 8)
|
||
draw.line([(0, y), (w, y)], fill=(r, g, b), width=1)
|
||
|
||
draw.rounded_rectangle([36, 34, 864, 349], radius=34, fill=(255, 255, 255), outline=(223, 229, 238), width=2)
|
||
draw.rounded_rectangle([604, 60, 830, 290], radius=34, fill=(238, 244, 241))
|
||
draw.ellipse([660, 95, 810, 245], fill=(229, 196, 122))
|
||
draw.ellipse([690, 126, 780, 216], fill=(255, 250, 229))
|
||
draw.arc([684, 118, 784, 226], start=20, end=168, fill=(199, 159, 81), width=5)
|
||
|
||
tag_font = _pick_font(28)
|
||
title_font = _pick_font(48)
|
||
summary_font = _pick_font(24)
|
||
small_font = _pick_font(20)
|
||
|
||
draw.rounded_rectangle([72, 70, 226, 118], radius=18, fill=(31, 111, 91))
|
||
draw.text((96, 80), "公众号封面", font=tag_font, fill=(255, 255, 255))
|
||
|
||
clean_title = re.sub(r"\s+", "", title or "公众号文章")
|
||
title_lines = textwrap.wrap(clean_title, width=12)[:2]
|
||
y = 146
|
||
for line in title_lines:
|
||
draw.text((72, y), line, font=title_font, fill=(23, 32, 51))
|
||
y += 58
|
||
|
||
clean_summary = re.sub(r"\s+", "", summary or "清晰表达主题,让读者一眼知道文章价值。")
|
||
summary_lines = textwrap.wrap(clean_summary, width=24)[:2]
|
||
sy = max(y + 12, 268)
|
||
for line in summary_lines:
|
||
draw.text((74, sy), line, font=summary_font, fill=(104, 115, 133))
|
||
sy += 32
|
||
|
||
draw.text((72, 320), "900 x 383", font=small_font, fill=(140, 150, 166))
|
||
buf = BytesIO()
|
||
im.save(buf, format="PNG")
|
||
return buf.getvalue()
|
||
|
||
def _merge_body_with_posters(self, paragraphs: list[str], wechat_urls_by_para: dict[int, str]) -> str:
|
||
merged: list[str] = []
|
||
for idx, para in enumerate(paragraphs):
|
||
if idx > 0:
|
||
url = (wechat_urls_by_para.get(idx) or "").strip()
|
||
if url:
|
||
merged.append(f"")
|
||
merged.append(para)
|
||
return "\n\n".join(merged)
|