Files
AIcreat/app/main.py
2026-04-28 12:10:27 +08:00

745 lines
30 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import logging
from pathlib import Path
from urllib.parse import urlparse
import httpx
from fastapi import FastAPI, File, HTTPException, Request, Response, UploadFile
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from app.config import settings
from app.logging_setup import configure_logging
from app.middleware import RequestContextMiddleware
from app.schemas import (
AIModelCreateRequest,
AIModelDeleteRequest,
AIModelSwitchRequest,
AuthCredentialRequest,
ChangePasswordRequest,
DeleteAccountRequest,
ForgotPasswordResetRequest,
IMPublishRequest,
PosterGenerateRequest,
RewriteRequest,
WechatCoverUploadByUrlRequest,
WechatCoverGenerateRequest,
WechatDeleteRequest,
WechatBindingRequest,
WechatPublishRequest,
WechatSwitchRequest,
VipRechargeRequest,
VipToggleRequest,
)
from app.services.ai_rewriter import AIRewriter
from app.services.im import IMPublisher
from app.services.poster_material import PosterMaterialService
from app.services.user_store import UserStore
from app.services.wechat import WechatPublisher
configure_logging()
logger = logging.getLogger(__name__)
app = FastAPI(title=settings.app_name)
@app.on_event("startup")
async def _log_startup() -> None:
logger.info(
"app_start name=%s user_model_required=%s ai_soft_accept=%s",
settings.app_name,
True,
settings.ai_soft_accept,
)
app.add_middleware(RequestContextMiddleware)
app.mount("/static", StaticFiles(directory="app/static"), name="static")
templates = Jinja2Templates(directory="app/templates")
rewriter = AIRewriter()
wechat = WechatPublisher()
poster_material = PosterMaterialService(wechat)
im = IMPublisher()
users = UserStore(settings.auth_db_path)
def _session_ttl(remember_me: bool) -> int:
normal = max(600, int(settings.auth_session_ttl_sec))
remembered = max(normal, int(settings.auth_remember_session_ttl_sec))
return remembered if remember_me else normal
def _current_user(request: Request) -> dict | None:
token = request.cookies.get(settings.auth_cookie_name, "")
return users.get_user_by_session(token) if token else None
def _require_user(request: Request) -> dict | None:
u = _current_user(request)
if not u:
return None
return u
def _platform_model_cfg() -> dict:
return {
"api_key": settings.platform_openai_api_key or "",
"base_url": settings.platform_openai_base_url or "",
"model": settings.platform_openai_model,
"image_model": settings.platform_openai_image_model,
"timeout_sec": float(settings.platform_openai_timeout),
"max_output_tokens": int(settings.platform_openai_max_output_tokens),
"max_retries": int(settings.platform_openai_max_retries),
"model_name": "平台模型",
}
def _select_model_cfg(user_id: int, prefer_vip: bool = True) -> tuple[dict | None, str]:
vip = users.get_vip_status(user_id)
if prefer_vip and vip.get("vip_enabled") and int(vip.get("token_balance") or 0) > 0:
cfg = _platform_model_cfg()
if cfg.get("api_key"):
return cfg, "vip"
cfg = users.get_active_ai_model(user_id)
return cfg, "user"
def _estimate_rewrite_cost(req: RewriteRequest, result) -> int:
src_chars = len((req.source_text or "").strip())
out_chars = len((result.body_markdown or "").strip()) + len((result.title or "").strip()) + len((result.summary or "").strip())
total_chars = max(1, src_chars + out_chars)
blocks = (total_chars + 999) // 1000
return int(blocks * max(1, int(settings.vip_rewrite_token_per_1k_chars)))
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
if not _current_user(request):
return RedirectResponse(url="/auth?next=/", status_code=302)
return templates.TemplateResponse("index.html", {"request": request, "app_name": settings.app_name})
@app.get("/auth", response_class=HTMLResponse)
async def auth_page(request: Request):
nxt = (request.query_params.get("next") or "/").strip() or "/"
if _current_user(request):
return RedirectResponse(url=nxt, status_code=302)
return templates.TemplateResponse(
"auth.html",
{"request": request, "app_name": settings.app_name, "next": nxt},
)
@app.get("/settings", response_class=HTMLResponse)
async def settings_page(request: Request):
if not _current_user(request):
return RedirectResponse(url="/auth?next=/settings", status_code=302)
return templates.TemplateResponse("settings.html", {"request": request, "app_name": settings.app_name})
@app.get("/guide", response_class=HTMLResponse)
async def guide_page(request: Request):
if not _current_user(request):
return RedirectResponse(url="/auth?next=/guide", status_code=302)
return templates.TemplateResponse("guide.html", {"request": request, "app_name": settings.app_name})
@app.get("/favicon.ico", include_in_schema=False)
async def favicon():
# 浏览器通常请求 /favicon.ico统一跳转到静态图标
return RedirectResponse(url="/static/favicon.svg?v=20260428h")
@app.get("/api/config")
async def api_config(request: Request):
"""供页面展示:当前是否接入模型、模型名、提供方(不含密钥)。"""
user = _current_user(request)
model_cfg = users.get_active_ai_model(user["id"]) if user else None
base = (model_cfg or {}).get("base_url") or ""
provider = "dashscope" if "dashscope.aliyuncs.com" in base else "openai_compatible"
host = urlparse(base).netloc if base else ""
model_name = (model_cfg or {}).get("model") or None
image_model_name = (model_cfg or {}).get("image_model") or settings.openai_image_model
timeout_sec = (model_cfg or {}).get("timeout_sec") or None
max_output_tokens = (model_cfg or {}).get("max_output_tokens") or None
key_configured = bool((model_cfg or {}).get("api_key"))
return {
"openai_configured": key_configured,
"openai_model": model_name,
"openai_image_model": image_model_name,
"provider": provider,
"base_url_host": host or None,
"openai_timeout_sec": timeout_sec,
"openai_max_output_tokens": max_output_tokens,
}
@app.get("/api/auth/me")
async def auth_me(request: Request):
user = _current_user(request)
if not user:
return {"ok": True, "logged_in": False}
binding = users.get_active_wechat_binding(user["id"])
bindings = users.list_wechat_bindings(user["id"])
return {
"ok": True,
"logged_in": True,
"user": {"id": user["id"], "username": user["username"]},
"wechat_bound": bool(binding and binding.get("appid") and binding.get("secret")),
"active_wechat_account": binding,
"wechat_accounts": bindings,
"active_ai_model": users.get_active_ai_model(user["id"]),
"ai_models": users.list_ai_models(user["id"]),
"vip": users.get_vip_status(user["id"]),
}
@app.post("/api/auth/register")
async def auth_register(req: AuthCredentialRequest, response: Response):
username = (req.username or "").strip()
password = req.password or ""
if len(username) < 2:
return {"ok": False, "detail": "用户名至少 2 个字符"}
if len(password) < 6:
return {"ok": False, "detail": "密码至少 6 个字符"}
try:
user = users.create_user(username, password)
except Exception as exc:
logger.exception("auth_register_failed username=%s detail=%s", username, str(exc))
return {"ok": False, "detail": "注册失败:账号库异常,请稍后重试"}
if not user:
return {"ok": False, "detail": "用户名已存在"}
users.ensure_trial_tokens(user["id"], settings.vip_trial_tokens)
ttl = _session_ttl(bool(req.remember_me))
token = users.create_session(user["id"], ttl_seconds=ttl)
response.set_cookie(
key=settings.auth_cookie_name,
value=token,
httponly=True,
samesite="lax",
max_age=ttl,
path="/",
)
return {
"ok": True,
"detail": "注册并登录成功,已赠送试用 token请保存重置码",
"user": {"id": user["id"], "username": user["username"]},
"reset_code": user.get("reset_code", ""),
}
@app.post("/api/auth/login")
async def auth_login(req: AuthCredentialRequest, response: Response):
try:
user = users.verify_user((req.username or "").strip(), req.password or "")
except Exception as exc:
logger.exception("auth_login_failed username=%s detail=%s", (req.username or "").strip(), str(exc))
return {"ok": False, "detail": "登录失败:账号库异常,请稍后重试"}
if not user:
return {"ok": False, "detail": "用户名或密码错误"}
ttl = _session_ttl(bool(req.remember_me))
token = users.create_session(user["id"], ttl_seconds=ttl)
response.set_cookie(
key=settings.auth_cookie_name,
value=token,
httponly=True,
samesite="lax",
max_age=ttl,
path="/",
)
return {"ok": True, "detail": "登录成功", "user": user}
@app.post("/api/auth/logout")
async def auth_logout(request: Request, response: Response):
token = request.cookies.get(settings.auth_cookie_name, "")
if token:
users.delete_session(token)
response.delete_cookie(settings.auth_cookie_name, path="/")
return {"ok": True, "detail": "已退出登录"}
@app.get("/auth/forgot", response_class=HTMLResponse)
async def forgot_password_page(request: Request):
return templates.TemplateResponse("forgot_password.html", {"request": request, "app_name": settings.app_name})
@app.post("/api/auth/password/forgot")
async def auth_forgot_password_reset(req: ForgotPasswordResetRequest):
username = (req.username or "").strip()
new_password = req.new_password or ""
if len(username) < 2:
return {"ok": False, "detail": "请输入正确的用户名"}
if len(new_password) < 6:
return {"ok": False, "detail": "新密码至少 6 个字符"}
ok = users.reset_password_by_username(username, req.reset_key, new_password)
if not ok:
return {"ok": False, "detail": "用户名或重置码错误,无法重置"}
return {"ok": True, "detail": "密码重置成功,请返回登录页重新登录"}
@app.post("/api/auth/password/change")
async def auth_change_password(req: ChangePasswordRequest, request: Request, response: Response):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
old_password = req.old_password or ""
new_password = req.new_password or ""
if len(old_password) < 1:
return {"ok": False, "detail": "请输入当前密码"}
if len(new_password) < 6:
return {"ok": False, "detail": "新密码至少 6 个字符"}
if old_password == new_password:
return {"ok": False, "detail": "新密码不能与当前密码相同"}
ok = users.change_password(user["id"], old_password, new_password)
if not ok:
return {"ok": False, "detail": "当前密码错误,修改失败"}
users.delete_sessions_by_user(user["id"])
ttl = _session_ttl(False)
token = users.create_session(user["id"], ttl_seconds=ttl)
response.set_cookie(
key=settings.auth_cookie_name,
value=token,
httponly=True,
samesite="lax",
max_age=ttl,
path="/",
)
return {"ok": True, "detail": "密码修改成功,已刷新登录状态"}
@app.post("/api/auth/account/delete")
async def auth_delete_account(req: DeleteAccountRequest, request: Request, response: Response):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
if len((req.password or "").strip()) < 1:
return {"ok": False, "detail": "请输入当前密码"}
if len((req.reset_key or "").strip()) < 4:
return {"ok": False, "detail": "请输入重置码"}
ok = users.delete_user_logically(user["id"], req.password, req.reset_key)
if not ok:
return {"ok": False, "detail": "密码或重置码错误,注销失败"}
token = request.cookies.get(settings.auth_cookie_name, "")
if token:
users.delete_session(token)
response.delete_cookie(settings.auth_cookie_name, path="/")
return {"ok": True, "detail": "账号已注销,关联数据已清空"}
@app.post("/api/auth/wechat/bind")
async def auth_wechat_bind(req: WechatBindingRequest, request: Request):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
appid = (req.appid or "").strip()
secret = (req.secret or "").strip()
if not appid or not secret:
return {"ok": False, "detail": "appid/secret 不能为空"}
created = users.add_wechat_binding(
user_id=user["id"],
account_name=(req.account_name or "").strip() or "公众号账号",
appid=appid,
secret=secret,
author=(req.author or "").strip(),
thumb_media_id=(req.thumb_media_id or "").strip(),
thumb_image_path=(req.thumb_image_path or "").strip(),
)
return {"ok": True, "detail": "公众号账号绑定成功", "account": created}
@app.post("/api/auth/wechat/switch")
async def auth_wechat_switch(req: WechatSwitchRequest, request: Request):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
ok = users.switch_active_wechat_binding(user["id"], int(req.account_id))
if not ok:
return {"ok": False, "detail": "切换失败:账号不存在或无权限"}
return {"ok": True, "detail": "已切换当前公众号账号"}
@app.post("/api/auth/wechat/delete")
async def auth_wechat_delete(req: WechatDeleteRequest, request: Request):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
ok = users.delete_wechat_binding(user["id"], int(req.account_id))
if not ok:
return {"ok": False, "detail": "删除失败:账号不存在或无权限"}
return {"ok": True, "detail": "公众号账号已删除"}
@app.post("/api/auth/ai-models/add")
async def auth_ai_model_add(req: AIModelCreateRequest, request: Request):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
api_key = (req.api_key or "").strip()
model = (req.model or "").strip()
if not api_key:
return {"ok": False, "detail": "API Key 不能为空"}
if not model:
return {"ok": False, "detail": "模型名不能为空"}
created = users.add_ai_model(
user_id=user["id"],
model_name=(req.model_name or "").strip(),
api_key=api_key,
base_url=(req.base_url or "").strip(),
model=model,
image_model=(req.image_model or "").strip(),
timeout_sec=max(10.0, float(req.timeout_sec or 120.0)),
max_output_tokens=max(256, int(req.max_output_tokens or 8192)),
max_retries=max(0, int(req.max_retries or 0)),
)
return {"ok": True, "detail": "模型配置已保存并设为当前", "model_config": created}
@app.post("/api/auth/ai-models/switch")
async def auth_ai_model_switch(req: AIModelSwitchRequest, request: Request):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
ok = users.switch_active_ai_model(user["id"], int(req.model_id))
if not ok:
return {"ok": False, "detail": "切换失败:模型不存在或无权限"}
return {"ok": True, "detail": "已切换当前模型"}
@app.post("/api/auth/ai-models/delete")
async def auth_ai_model_delete(req: AIModelDeleteRequest, request: Request):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
ok = users.delete_ai_model(user["id"], int(req.model_id))
if not ok:
return {"ok": False, "detail": "删除失败:模型不存在或无权限"}
return {"ok": True, "detail": "模型配置已删除"}
@app.post("/api/rewrite")
async def rewrite(req: RewriteRequest, request: Request):
rid = getattr(request.state, "request_id", "")
src = req.source_text or ""
logger.info(
"api_rewrite_in rid=%s source_chars=%d title_hint_chars=%d tone=%s audience=%s "
"keep_points_chars=%d avoid_words_chars=%d target_body_chars=%d",
rid,
len(src),
len(req.title_hint or ""),
req.tone,
req.audience,
len(req.keep_points or ""),
len(req.avoid_words or ""),
int(req.target_body_chars or 500),
)
user = _require_user(request)
if not user:
raise HTTPException(status_code=401, detail="请先登录")
model_cfg = users.get_active_ai_model(user["id"])
if not model_cfg:
raise HTTPException(status_code=400, detail="请先在设置页配置 AI 模型")
backup = {
"openai_api_key": settings.openai_api_key,
"openai_base_url": settings.openai_base_url,
"openai_model": settings.openai_model,
"openai_timeout": settings.openai_timeout,
"openai_max_output_tokens": settings.openai_max_output_tokens,
"openai_max_retries": settings.openai_max_retries,
"openai_image_model": settings.openai_image_model,
}
try:
settings.openai_api_key = model_cfg.get("api_key") or ""
settings.openai_base_url = model_cfg.get("base_url") or ""
settings.openai_model = model_cfg.get("model") or ""
settings.openai_timeout = float(model_cfg.get("timeout_sec") or 120.0)
settings.openai_max_output_tokens = int(model_cfg.get("max_output_tokens") or 8192)
settings.openai_max_retries = int(model_cfg.get("max_retries") or 0)
result = AIRewriter().rewrite(req, request_id=rid)
finally:
settings.openai_api_key = backup["openai_api_key"]
settings.openai_base_url = backup["openai_base_url"]
settings.openai_model = backup["openai_model"]
settings.openai_timeout = backup["openai_timeout"]
settings.openai_max_output_tokens = backup["openai_max_output_tokens"]
settings.openai_max_retries = backup["openai_max_retries"]
settings.openai_image_model = backup["openai_image_model"]
tr = result.trace or {}
logger.info(
"api_rewrite_out rid=%s mode=%s duration_ms=%s quality_notes=%d trace_steps=%s soft_accept=%s",
rid,
result.mode,
tr.get("duration_ms"),
len(result.quality_notes or []),
len((tr.get("steps") or [])),
tr.get("quality_soft_accept"),
)
return result
@app.post("/api/publish/wechat")
async def publish_wechat(req: WechatPublishRequest, request: Request):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
binding = users.get_active_wechat_binding(user["id"])
if not binding:
return {"ok": False, "detail": "当前账号未绑定公众号 token请先在页面绑定"}
rid = getattr(request.state, "request_id", "")
logger.info(
"api_wechat_in rid=%s title_chars=%d summary_chars=%d body_md_chars=%d author_set=%s",
rid,
len(req.title or ""),
len(req.summary or ""),
len(req.body_markdown or ""),
bool((req.author or "").strip()),
)
out = await wechat.publish_draft(req, request_id=rid, account=binding)
wcode = (out.data or {}).get("errcode") if isinstance(out.data, dict) else None
logger.info(
"api_wechat_out rid=%s ok=%s wechat_errcode=%s detail_preview=%s",
rid,
out.ok,
wcode,
(out.detail or "")[:240],
)
return out
@app.post("/api/wechat/cover/upload")
async def upload_wechat_cover(request: Request, file: UploadFile = File(...)):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
binding = users.get_active_wechat_binding(user["id"])
if not binding:
return {"ok": False, "detail": "当前账号未绑定公众号 token请先在页面绑定"}
rid = getattr(request.state, "request_id", "")
fn = file.filename or "cover.jpg"
content = await file.read()
logger.info("api_wechat_cover_upload_in rid=%s filename=%s bytes=%d", rid, fn, len(content))
out = await wechat.upload_cover(fn, content, request_id=rid, account=binding)
logger.info(
"api_wechat_cover_upload_out rid=%s ok=%s detail=%s",
rid,
out.ok,
(out.detail or "")[:160],
)
return out
@app.post("/api/wechat/cover/upload-by-url")
async def upload_wechat_cover_by_url(req: WechatCoverUploadByUrlRequest, request: Request):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
binding = users.get_active_wechat_binding(user["id"])
if not binding:
return {"ok": False, "detail": "当前账号未绑定公众号 token请先在页面绑定"}
rid = getattr(request.state, "request_id", "")
image_url = (req.image_url or "").strip()
if not image_url:
return {"ok": False, "detail": "图片 URL 不能为空"}
parsed = urlparse(image_url)
if parsed.scheme not in {"http", "https"}:
return {"ok": False, "detail": "仅支持 http/https 图片地址"}
try:
async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client:
resp = await client.get(image_url)
except Exception as exc:
logger.warning("api_wechat_cover_url_fetch_fail rid=%s url=%s err=%s", rid, image_url, exc)
return {"ok": False, "detail": "图片下载失败,请检查 URL 是否可访问"}
if resp.status_code >= 400:
return {"ok": False, "detail": f"图片下载失败HTTP {resp.status_code}"}
content_type = (resp.headers.get("content-type") or "").lower()
if not content_type.startswith("image/"):
return {"ok": False, "detail": "URL 指向的内容不是图片"}
content = resp.content or b""
if not content:
return {"ok": False, "detail": "图片内容为空"}
if len(content) > 10 * 1024 * 1024:
return {"ok": False, "detail": "图片过大,请使用 10MB 以内的图片"}
ext = Path(parsed.path or "").suffix.lower()
if ext not in {".jpg", ".jpeg", ".png", ".webp"}:
ext = ".png" if "png" in content_type else ".jpg"
fn = f"cover_from_url{ext}"
logger.info("api_wechat_cover_upload_url_in rid=%s url=%s bytes=%d", rid, image_url, len(content))
out = await wechat.upload_cover(fn, content, request_id=rid, account=binding)
logger.info(
"api_wechat_cover_upload_url_out rid=%s ok=%s detail=%s",
rid,
out.ok,
(out.detail or "")[:160],
)
return out
@app.post("/api/wechat/cover/generate")
async def generate_wechat_cover(req: WechatCoverGenerateRequest, request: Request):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
binding = users.get_active_wechat_binding(user["id"]) if req.upload_to_wechat else None
if req.upload_to_wechat and not binding:
return {"ok": False, "detail": "当前账号未绑定公众号 token请先在页面绑定"}
rid = getattr(request.state, "request_id", "")
logger.info(
"api_wechat_cover_generate_in rid=%s title_chars=%d summary_chars=%d upload_to_wechat=%s",
rid,
len(req.title or ""),
len(req.summary or ""),
req.upload_to_wechat,
)
model_cfg = users.get_active_ai_model(user["id"])
backup = {
"openai_api_key": settings.openai_api_key,
"openai_base_url": settings.openai_base_url,
"openai_model": settings.openai_model,
"openai_timeout": settings.openai_timeout,
"openai_max_output_tokens": settings.openai_max_output_tokens,
"openai_max_retries": settings.openai_max_retries,
"openai_image_model": settings.openai_image_model,
}
try:
if model_cfg:
settings.openai_api_key = model_cfg.get("api_key") or ""
settings.openai_base_url = model_cfg.get("base_url") or ""
settings.openai_model = model_cfg.get("model") or ""
settings.openai_timeout = float(model_cfg.get("timeout_sec") or 120.0)
settings.openai_max_output_tokens = int(model_cfg.get("max_output_tokens") or 8192)
settings.openai_max_retries = int(model_cfg.get("max_retries") or 0)
settings.openai_image_model = (model_cfg.get("image_model") or "").strip() or backup["openai_image_model"]
else:
settings.openai_api_key = ""
settings.openai_base_url = ""
settings.openai_model = ""
settings.openai_timeout = 120.0
settings.openai_max_output_tokens = 8192
settings.openai_max_retries = 0
settings.openai_image_model = backup["openai_image_model"]
out = await PosterMaterialService(wechat).generate_cover(req, request_id=rid, account=binding)
finally:
settings.openai_api_key = backup["openai_api_key"]
settings.openai_base_url = backup["openai_base_url"]
settings.openai_model = backup["openai_model"]
settings.openai_timeout = backup["openai_timeout"]
settings.openai_max_output_tokens = backup["openai_max_output_tokens"]
settings.openai_max_retries = backup["openai_max_retries"]
settings.openai_image_model = backup["openai_image_model"]
logger.info(
"api_wechat_cover_generate_out rid=%s ok=%s thumb=%s note=%s warnings=%d",
rid,
out.ok,
bool(out.thumb_media_id),
out.note,
len(out.warnings),
)
return out
@app.post("/api/wechat/material/upload")
async def upload_wechat_material(request: Request, file: UploadFile = File(...)):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
binding = users.get_active_wechat_binding(user["id"])
if not binding:
return {"ok": False, "detail": "当前账号未绑定公众号 token请先在页面绑定"}
rid = getattr(request.state, "request_id", "")
fn = file.filename or "material.jpg"
content = await file.read()
logger.info("api_wechat_material_upload_in rid=%s filename=%s bytes=%d", rid, fn, len(content))
out = await wechat.upload_body_material(fn, content, request_id=rid, account=binding)
logger.info(
"api_wechat_material_upload_out rid=%s ok=%s detail=%s",
rid,
out.ok,
(out.detail or "")[:160],
)
return out
@app.post("/api/material/posters/generate")
async def generate_posters(req: PosterGenerateRequest, request: Request):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
binding = users.get_active_wechat_binding(user["id"]) if req.upload_to_wechat else None
if req.upload_to_wechat and not binding:
return {"ok": False, "detail": "当前账号未绑定公众号 token请先在页面绑定"}
rid = getattr(request.state, "request_id", "")
logger.info(
"api_poster_generate_in rid=%s body_chars=%d upload_to_wechat=%s max_images=%d",
rid,
len(req.body_markdown or ""),
req.upload_to_wechat,
int(req.max_images or 0),
)
model_cfg = users.get_active_ai_model(user["id"])
backup = {
"openai_api_key": settings.openai_api_key,
"openai_base_url": settings.openai_base_url,
"openai_model": settings.openai_model,
"openai_timeout": settings.openai_timeout,
"openai_max_output_tokens": settings.openai_max_output_tokens,
"openai_max_retries": settings.openai_max_retries,
"openai_image_model": settings.openai_image_model,
}
try:
if model_cfg:
settings.openai_api_key = model_cfg.get("api_key") or ""
settings.openai_base_url = model_cfg.get("base_url") or ""
settings.openai_model = model_cfg.get("model") or ""
settings.openai_timeout = float(model_cfg.get("timeout_sec") or 120.0)
settings.openai_max_output_tokens = int(model_cfg.get("max_output_tokens") or 8192)
settings.openai_max_retries = int(model_cfg.get("max_retries") or 0)
settings.openai_image_model = (model_cfg.get("image_model") or "").strip() or backup["openai_image_model"]
else:
settings.openai_api_key = ""
settings.openai_base_url = ""
settings.openai_model = ""
settings.openai_timeout = 120.0
settings.openai_max_output_tokens = 8192
settings.openai_max_retries = 0
settings.openai_image_model = backup["openai_image_model"]
out = await PosterMaterialService(wechat).generate(req, request_id=rid, account=binding)
finally:
settings.openai_api_key = backup["openai_api_key"]
settings.openai_base_url = backup["openai_base_url"]
settings.openai_model = backup["openai_model"]
settings.openai_timeout = backup["openai_timeout"]
settings.openai_max_output_tokens = backup["openai_max_output_tokens"]
settings.openai_max_retries = backup["openai_max_retries"]
settings.openai_image_model = backup["openai_image_model"]
logger.info(
"api_poster_generate_out rid=%s ok=%s posters=%d warnings=%d",
rid,
out.ok,
len(out.posters),
len(out.warnings),
)
return out
@app.post("/api/publish/im")
async def publish_im(req: IMPublishRequest, request: Request):
rid = getattr(request.state, "request_id", "")
logger.info(
"api_im_in rid=%s title_chars=%d body_md_chars=%d",
rid,
len(req.title or ""),
len(req.body_markdown or ""),
)
out = await im.publish(req, request_id=rid)
logger.info("api_im_out rid=%s ok=%s detail=%s", rid, out.ok, (out.detail or "")[:120])
return out