Files
AIcreat/app/main.py
2026-04-28 18:36:38 +08:00

1338 lines
55 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import logging
import math
import re
import secrets
import time
import uuid
from pathlib import Path
from urllib.parse import urlparse
import httpx
from fastapi import FastAPI, File, HTTPException, Request, Response, UploadFile
from fastapi.responses import HTMLResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from app.config import settings
from app.logging_setup import configure_logging
from app.middleware import RequestContextMiddleware
from app.schemas import (
AIModelCreateRequest,
AIModelDeleteRequest,
AIImageModelUpdateRequest,
AIModelSwitchRequest,
AuthCredentialRequest,
BillingRechargeCreateRequest,
BillingPayNowRequest,
BillingRechargeNotifyRequest,
ChangePasswordRequest,
DeleteAccountRequest,
ForgotPasswordResetRequest,
IMPublishRequest,
PosterGenerateRequest,
RewriteRequest,
WechatCoverUploadByUrlRequest,
WechatCoverGenerateRequest,
WechatDeleteRequest,
WechatBindingRequest,
WechatPublishRequest,
WechatSwitchRequest,
UserProfileUpdateRequest,
VipRechargeRequest,
VipToggleRequest,
)
from app.services.ai_rewriter import AIRewriter
from app.services.im import IMPublisher
from app.services.poster_material import PosterMaterialService
from app.services.user_store import UserStore
from app.services.wechat import WechatPublisher
configure_logging()
logger = logging.getLogger(__name__)
app = FastAPI(title=settings.app_name)
@app.on_event("startup")
async def _log_startup() -> None:
logger.info(
"app_start name=%s user_model_required=%s ai_soft_accept=%s",
settings.app_name,
True,
settings.ai_soft_accept,
)
app.add_middleware(RequestContextMiddleware)
app.mount("/static", StaticFiles(directory="app/static"), name="static")
templates = Jinja2Templates(directory="app/templates")
rewriter = AIRewriter()
wechat = WechatPublisher()
poster_material = PosterMaterialService(wechat)
im = IMPublisher()
users = UserStore(settings.auth_db_path)
_register_rate: dict[str, list[float]] = {}
_login_rate: dict[str, list[float]] = {}
_challenge_pool: dict[str, dict] = {}
USERNAME_RE = re.compile(r"^[A-Za-z0-9_]{4,24}$")
PASSWORD_STRONG_RE = re.compile(r"^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[^A-Za-z0-9])\S{10,64}$")
def _session_ttl(remember_me: bool) -> int:
normal = max(600, int(settings.auth_session_ttl_sec))
remembered = max(normal, int(settings.auth_remember_session_ttl_sec))
return remembered if remember_me else normal
def _current_user(request: Request) -> dict | None:
token = request.cookies.get(settings.auth_cookie_name, "")
return users.get_user_by_session(token) if token else None
def _require_user(request: Request) -> dict | None:
u = _current_user(request)
if not u:
return None
return u
def _client_ip(request: Request) -> str:
xff = (request.headers.get("x-forwarded-for") or "").strip()
if xff:
return xff.split(",")[0].strip()
return (request.client.host if request.client else "") or "unknown"
def _hit_limit(bucket: dict[str, list[float]], key: str, limit: int, window_sec: int) -> bool:
now = time.time()
points = bucket.get(key, [])
points = [p for p in points if now - p <= window_sec]
points.append(now)
bucket[key] = points
return len(points) > limit
def _create_challenge() -> dict:
a = secrets.randbelow(8) + 2
b = secrets.randbelow(9) + 1
op = "+" if secrets.randbelow(2) == 0 else "-"
if op == "-":
a, b = max(a, b) + 5, min(a, b)
answer = str(a + b) if op == "+" else str(a - b)
cid = secrets.token_urlsafe(16)
_challenge_pool[cid] = {"answer": answer, "created_at": time.time(), "used": False}
return {"challenge_id": cid, "question": f"{a} {op} {b} = ?"}
def _verify_challenge(req: AuthCredentialRequest) -> tuple[bool, str]:
cid = (req.challenge_id or "").strip()
ans = (req.challenge_answer or "").strip()
if req.honeypot:
return False, "请求被拒绝"
if not cid or not ans:
return False, "请先完成人机校验"
item = _challenge_pool.get(cid)
if not item:
return False, "校验已失效,请刷新后重试"
if item.get("used"):
return False, "校验已使用,请刷新后重试"
age = time.time() - float(item.get("created_at") or 0)
if age < 2:
return False, "提交过快,请稍后重试"
if age > 300:
return False, "校验已过期,请刷新后重试"
if ans != str(item.get("answer") or ""):
return False, "人机校验答案错误"
item["used"] = True
return True, ""
def _validate_username_password(username: str, password: str) -> tuple[bool, str]:
if not USERNAME_RE.match(username):
return False, "用户名需为 4-24 位,仅支持字母/数字/下划线"
if not PASSWORD_STRONG_RE.match(password):
return False, "密码需 10-64 位,且包含大小写字母、数字和特殊字符"
if username.lower() in password.lower():
return False, "密码不能包含用户名"
return True, ""
def _platform_model_cfg() -> dict:
return {
"api_key": settings.platform_openai_api_key or "",
"base_url": settings.platform_openai_base_url or "",
"model": settings.platform_openai_model,
"image_model": settings.platform_openai_image_model,
"timeout_sec": float(settings.platform_openai_timeout),
"max_output_tokens": int(settings.platform_openai_max_output_tokens),
"max_retries": int(settings.platform_openai_max_retries),
"model_name": "平台模型",
}
def _select_model_cfg(user_id: int, prefer_vip: bool = True) -> tuple[dict | None, str]:
vip = users.get_vip_status(user_id)
if prefer_vip and vip.get("vip_enabled"):
if int(vip.get("total_available_credits") or 0) <= 0:
return None, "vip_empty"
cfg = _platform_model_cfg()
if cfg.get("api_key"):
return cfg, "vip"
cfg = users.get_active_ai_model(user_id)
return cfg, "user"
def _quota_detail() -> str:
return "Credits 额度已用完(席位额度+共享加油包),请充值或等待下个计费周期"
def _credits_from_cny(amount_cny: float) -> int:
package_credits = max(1, int(settings.credits_recharge_package_credits))
package_amount = max(0.01, float(settings.credits_recharge_package_amount))
cny = max(0.0, float(amount_cny))
if cny <= 0:
return 0
return max(1, int(math.ceil((cny * package_credits) / package_amount)))
def _estimate_rewrite_cost(req: RewriteRequest, result) -> int:
trace = getattr(result, "trace", None) or {}
usage = trace.get("usage") if isinstance(trace, dict) else {}
usage_total_tokens = int((usage or {}).get("total_tokens") or 0)
if usage_total_tokens > 0:
token_price_cny = max(0.0, float(settings.credits_token_price_per_million_cny))
return _credits_from_cny((usage_total_tokens / 1_000_000.0) * token_price_cny)
src_chars = len((req.source_text or "").strip())
out_chars = len((result.body_markdown or "").strip()) + len((result.title or "").strip()) + len((result.summary or "").strip())
total_chars = max(1, src_chars + out_chars)
estimated_tokens = int(total_chars * 1.8)
token_price_cny = max(0.0, float(settings.credits_token_price_per_million_cny))
return _credits_from_cny((max(1, estimated_tokens) / 1_000_000.0) * token_price_cny)
def _estimate_image_cost(image_count: int) -> int:
cnt = max(0, int(image_count))
if cnt <= 0:
return 0
pkg_images = max(1, int(settings.credits_image_price_package_images or 160))
pkg_cny = max(0.0, float(settings.credits_image_price_package_cny or 0.75))
return _credits_from_cny((cnt / pkg_images) * pkg_cny)
def _new_order_no() -> str:
return f"RC{time.strftime('%Y%m%d%H%M%S')}{uuid.uuid4().hex[:10].upper()}"
async def _create_recharge_order_with_pay_url(
user_id: int,
username: str,
credits: int,
callback_url: str,
order_meta: dict | None = None,
channel: str = "wechat",
) -> tuple[dict, str]:
package_credits = max(1, int(settings.credits_recharge_package_credits))
package_amount = float(settings.credits_recharge_package_amount)
amount_cny = round((int(credits) / package_credits) * package_amount, 2)
order_no = _new_order_no()
order = users.create_recharge_order(
user_id=user_id,
order_no=order_no,
channel=(channel or "wechat").strip() or "wechat",
token_amount=int(credits),
amount_cny=amount_cny,
meta={"username": username, **(order_meta or {})},
)
pay_payload = {
"order_no": order_no,
"user_id": int(user_id),
"username": username,
"callback_url": callback_url,
**order,
}
pay_url = ""
if settings.shop_backend_create_order_url:
try:
async with httpx.AsyncClient(timeout=15) as client:
r = await client.post(
settings.shop_backend_create_order_url,
json=pay_payload,
headers={"X-Shop-Token": settings.shop_backend_callback_token or ""},
)
body = r.json() if r.content else {}
if r.status_code < 400 and isinstance(body, dict):
pay_url = str(body.get("pay_url") or "")
except Exception as exc:
logger.warning("shop_create_order_failed order=%s err=%s", order_no, exc)
return order, pay_url
async def _fetch_pay_url_for_order(order: dict, user_id: int, username: str, callback_url: str) -> str:
pay_payload = {
"order_no": order.get("order_no", ""),
"user_id": int(user_id),
"username": username,
"callback_url": callback_url,
**order,
}
pay_url = ""
if settings.shop_backend_create_order_url:
try:
async with httpx.AsyncClient(timeout=15) as client:
r = await client.post(
settings.shop_backend_create_order_url,
json=pay_payload,
headers={"X-Shop-Token": settings.shop_backend_callback_token or ""},
)
body = r.json() if r.content else {}
if r.status_code < 400 and isinstance(body, dict):
pay_url = str(body.get("pay_url") or "")
except Exception as exc:
logger.warning("shop_repay_order_failed order=%s err=%s", order.get("order_no", ""), exc)
return pay_url
@app.get("/", response_class=HTMLResponse)
async def index(request: Request):
if not _current_user(request):
return RedirectResponse(url="/auth?next=/", status_code=302)
return templates.TemplateResponse("index.html", {"request": request, "app_name": settings.app_name})
@app.get("/auth", response_class=HTMLResponse)
async def auth_page(request: Request):
nxt = (request.query_params.get("next") or "/").strip() or "/"
if _current_user(request):
return RedirectResponse(url=nxt, status_code=302)
return templates.TemplateResponse(
"auth.html",
{"request": request, "app_name": settings.app_name, "next": nxt},
)
@app.get("/settings", response_class=HTMLResponse)
async def settings_page(request: Request):
if not _current_user(request):
return RedirectResponse(url="/auth?next=/settings", status_code=302)
return templates.TemplateResponse("settings.html", {"request": request, "app_name": settings.app_name})
@app.get("/billing", response_class=HTMLResponse)
async def billing_page(request: Request):
if not _current_user(request):
return RedirectResponse(url="/auth?next=/billing", status_code=302)
return templates.TemplateResponse(
"billing.html",
{
"request": request,
"app_name": settings.app_name,
"package_amount": settings.credits_recharge_package_amount,
"package_credits": settings.credits_recharge_package_credits,
},
)
@app.get("/upgrade", response_class=HTMLResponse)
async def upgrade_page(request: Request):
if not _current_user(request):
return RedirectResponse(url="/auth?next=/upgrade", status_code=302)
return templates.TemplateResponse(
"upgrade.html",
{
"request": request,
"app_name": settings.app_name,
"trial_tokens": settings.vip_trial_tokens,
"rewrite_cost": settings.credits_per_million_tokens,
"image_cost": settings.credits_per_120_images,
"seat_price": settings.credits_standard_seat_price_cny,
"seat_quota": settings.credits_seat_monthly_quota,
"package_amount": settings.credits_recharge_package_amount,
"package_credits": settings.credits_recharge_package_credits,
},
)
@app.get("/profile", response_class=HTMLResponse)
async def profile_page(request: Request):
if not _current_user(request):
return RedirectResponse(url="/auth?next=/profile", status_code=302)
return templates.TemplateResponse("profile.html", {"request": request, "app_name": settings.app_name})
@app.get("/guide", response_class=HTMLResponse)
async def guide_page(request: Request):
if not _current_user(request):
return RedirectResponse(url="/auth?next=/guide", status_code=302)
return templates.TemplateResponse("guide.html", {"request": request, "app_name": settings.app_name})
@app.get("/favicon.ico", include_in_schema=False)
async def favicon():
# 浏览器通常请求 /favicon.ico统一跳转到静态图标
return RedirectResponse(url="/static/favicon.svg?v=20260428h")
@app.get("/api/config")
async def api_config(request: Request):
"""供页面展示:当前是否接入模型、模型名、提供方(不含密钥)。"""
user = _current_user(request)
model_cfg = users.get_active_ai_model(user["id"]) if user else None
base = (model_cfg or {}).get("base_url") or ""
provider = "dashscope" if "dashscope.aliyuncs.com" in base else "openai_compatible"
host = urlparse(base).netloc if base else ""
model_name = (model_cfg or {}).get("model") or None
image_model_name = (model_cfg or {}).get("image_model") or settings.openai_image_model
timeout_sec = (model_cfg or {}).get("timeout_sec") or None
max_output_tokens = (model_cfg or {}).get("max_output_tokens") or None
key_configured = bool((model_cfg or {}).get("api_key"))
return {
"openai_configured": key_configured,
"openai_model": model_name,
"openai_image_model": image_model_name,
"provider": provider,
"base_url_host": host or None,
"openai_timeout_sec": timeout_sec,
"openai_max_output_tokens": max_output_tokens,
}
@app.get("/api/auth/me")
async def auth_me(request: Request):
user = _current_user(request)
if not user:
return {"ok": True, "logged_in": False}
binding = users.get_active_wechat_binding(user["id"])
bindings = users.list_wechat_bindings(user["id"])
return {
"ok": True,
"logged_in": True,
"user": {"id": user["id"], "username": user["username"]},
"wechat_bound": bool(binding and binding.get("appid") and binding.get("secret")),
"active_wechat_account": binding,
"wechat_accounts": bindings,
"active_ai_model": users.get_active_ai_model(user["id"]),
"ai_models": users.list_ai_models(user["id"]),
"vip": users.get_vip_status(user["id"]),
}
@app.get("/api/profile")
async def api_profile(request: Request):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
return {"ok": True, "profile": users.get_user_profile(user["id"])}
@app.post("/api/profile")
async def api_profile_update(req: UserProfileUpdateRequest, request: Request):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
profile = users.save_user_profile(
user["id"],
subscriber_name=(req.subscriber_name or "").strip(),
subscriber_phone=(req.subscriber_phone or "").strip(),
shipping_address=(req.shipping_address or "").strip(),
)
return {"ok": True, "detail": "个人信息已保存", "profile": profile}
@app.get("/api/auth/challenge")
async def auth_challenge():
return {"ok": True, **_create_challenge()}
@app.post("/api/auth/register")
async def auth_register(req: AuthCredentialRequest, request: Request, response: Response):
ip = _client_ip(request)
if _hit_limit(_register_rate, f"ip:{ip}", limit=8, window_sec=600):
return {"ok": False, "detail": "请求过于频繁,请稍后再试"}
if _hit_limit(_register_rate, f"user:{(req.username or '').strip().lower()}", limit=6, window_sec=600):
return {"ok": False, "detail": "该用户名操作过于频繁,请稍后再试"}
username = (req.username or "").strip()
password = req.password or ""
ok, msg = _validate_username_password(username, password)
if not ok:
return {"ok": False, "detail": msg}
ok_challenge, challenge_msg = _verify_challenge(req)
if not ok_challenge:
return {"ok": False, "detail": challenge_msg}
try:
user = users.create_user(username, password)
except Exception as exc:
logger.exception("auth_register_failed username=%s detail=%s", username, str(exc))
return {"ok": False, "detail": "注册失败:账号库异常,请稍后重试"}
if not user:
return {"ok": False, "detail": "用户名已存在"}
users.ensure_trial_tokens(user["id"], settings.vip_trial_tokens)
ttl = _session_ttl(bool(req.remember_me))
token = users.create_session(user["id"], ttl_seconds=ttl)
response.set_cookie(
key=settings.auth_cookie_name,
value=token,
httponly=True,
samesite="lax",
max_age=ttl,
path="/",
)
return {
"ok": True,
"detail": "注册并登录成功,已赠送试用 Credits请保存重置码",
"user": {"id": user["id"], "username": user["username"]},
"reset_code": user.get("reset_code", ""),
}
@app.post("/api/auth/login")
async def auth_login(req: AuthCredentialRequest, request: Request, response: Response):
ip = _client_ip(request)
if _hit_limit(_login_rate, f"ip:{ip}", limit=20, window_sec=600):
return {"ok": False, "detail": "登录过于频繁,请稍后再试"}
if _hit_limit(_login_rate, f"user:{(req.username or '').strip().lower()}", limit=12, window_sec=600):
return {"ok": False, "detail": "该账户登录尝试过多,请稍后再试"}
try:
user = users.verify_user((req.username or "").strip(), req.password or "")
except Exception as exc:
logger.exception("auth_login_failed username=%s detail=%s", (req.username or "").strip(), str(exc))
return {"ok": False, "detail": "登录失败:账号库异常,请稍后重试"}
if not user:
return {"ok": False, "detail": "用户名或密码错误"}
ttl = _session_ttl(bool(req.remember_me))
token = users.create_session(user["id"], ttl_seconds=ttl)
response.set_cookie(
key=settings.auth_cookie_name,
value=token,
httponly=True,
samesite="lax",
max_age=ttl,
path="/",
)
return {"ok": True, "detail": "登录成功", "user": user}
@app.post("/api/auth/logout")
async def auth_logout(request: Request, response: Response):
token = request.cookies.get(settings.auth_cookie_name, "")
if token:
users.delete_session(token)
response.delete_cookie(settings.auth_cookie_name, path="/")
return {"ok": True, "detail": "已退出登录"}
@app.get("/auth/forgot", response_class=HTMLResponse)
async def forgot_password_page(request: Request):
return templates.TemplateResponse("forgot_password.html", {"request": request, "app_name": settings.app_name})
@app.post("/api/auth/password/forgot")
async def auth_forgot_password_reset(req: ForgotPasswordResetRequest):
username = (req.username or "").strip()
new_password = req.new_password or ""
if len(username) < 2:
return {"ok": False, "detail": "请输入正确的用户名"}
if len(new_password) < 6:
return {"ok": False, "detail": "新密码至少 6 个字符"}
ok = users.reset_password_by_username(username, req.reset_key, new_password)
if not ok:
return {"ok": False, "detail": "用户名或重置码错误,无法重置"}
return {"ok": True, "detail": "密码重置成功,请返回登录页重新登录"}
@app.post("/api/auth/password/change")
async def auth_change_password(req: ChangePasswordRequest, request: Request, response: Response):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
old_password = req.old_password or ""
new_password = req.new_password or ""
if len(old_password) < 1:
return {"ok": False, "detail": "请输入当前密码"}
if len(new_password) < 6:
return {"ok": False, "detail": "新密码至少 6 个字符"}
if old_password == new_password:
return {"ok": False, "detail": "新密码不能与当前密码相同"}
ok = users.change_password(user["id"], old_password, new_password)
if not ok:
return {"ok": False, "detail": "当前密码错误,修改失败"}
users.delete_sessions_by_user(user["id"])
ttl = _session_ttl(False)
token = users.create_session(user["id"], ttl_seconds=ttl)
response.set_cookie(
key=settings.auth_cookie_name,
value=token,
httponly=True,
samesite="lax",
max_age=ttl,
path="/",
)
return {"ok": True, "detail": "密码修改成功,已刷新登录状态"}
@app.post("/api/auth/account/delete")
async def auth_delete_account(req: DeleteAccountRequest, request: Request, response: Response):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
if len((req.password or "").strip()) < 1:
return {"ok": False, "detail": "请输入当前密码"}
if len((req.reset_key or "").strip()) < 4:
return {"ok": False, "detail": "请输入重置码"}
ok = users.delete_user_logically(user["id"], req.password, req.reset_key)
if not ok:
return {"ok": False, "detail": "密码或重置码错误,注销失败"}
token = request.cookies.get(settings.auth_cookie_name, "")
if token:
users.delete_session(token)
response.delete_cookie(settings.auth_cookie_name, path="/")
return {"ok": True, "detail": "账号已注销,关联数据已清空"}
@app.get("/api/auth/vip/status")
async def auth_vip_status(request: Request):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
return {"ok": True, "vip": users.get_vip_status(user["id"])}
@app.post("/api/auth/vip/toggle")
async def auth_vip_toggle(req: VipToggleRequest, request: Request):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
vip = users.set_vip_enabled(user["id"], bool(req.enabled))
return {"ok": True, "detail": "VIP 模式已更新", "vip": vip}
@app.post("/api/auth/vip/recharge")
async def auth_vip_recharge(req: VipRechargeRequest, request: Request):
token = (request.headers.get("X-Shop-Token") or "").strip()
if not settings.shop_backend_callback_token or token != settings.shop_backend_callback_token:
raise HTTPException(status_code=403, detail="直充接口已禁用,请走支付订单接口")
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
vip = users.recharge_tokens(
user["id"],
int(req.tokens),
kind="admin_direct_recharge",
ref_type="admin",
ref_id="",
detail={"source": "legacy_api"},
)
return {"ok": True, "detail": "充值成功", "vip": vip}
@app.get("/api/billing/overview")
async def billing_overview(request: Request):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
return {
"ok": True,
"vip": users.get_vip_status(user["id"]),
"recharge_records": users.list_recharge_orders(user["id"], limit=30),
"consume_records": users.list_token_ledger(user["id"], limit=100),
}
@app.post("/api/billing/recharge/create")
async def billing_recharge_create(req: BillingRechargeCreateRequest, request: Request):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
order, pay_url = await _create_recharge_order_with_pay_url(
user_id=user["id"],
username=user["username"],
credits=int(req.tokens),
callback_url=f"{str(request.base_url).rstrip('/')}/api/pay/wechat/backcall",
order_meta={
"subscriber_name": (req.subscriber_name or "").strip(),
"subscriber_phone": (req.subscriber_phone or "").strip(),
"shipping_address": (req.shipping_address or "").strip(),
},
channel=(req.channel or "wechat").strip() or "wechat",
)
return {"ok": True, "detail": "充值订单已创建", "order": order, "pay_url": pay_url}
@app.post("/api/billing/recharge/pay-now")
async def billing_recharge_pay_now(req: BillingPayNowRequest, request: Request):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
order_no = (req.order_no or "").strip()
if not order_no:
return {"ok": False, "detail": "订单号不能为空"}
order = users.get_recharge_order(user["id"], order_no)
if not order:
return {"ok": False, "detail": "订单不存在"}
status = (order.get("status") or "").lower()
if status in {"paid", "success"}:
return {"ok": False, "detail": "订单已支付,无需重复支付", "order": order}
if status in {"cancelled", "closed"}:
return {"ok": False, "detail": "订单已取消,请重新创建订单", "order": order}
pay_url = await _fetch_pay_url_for_order(
order=order,
user_id=user["id"],
username=user["username"],
callback_url=f"{str(request.base_url).rstrip('/')}/api/pay/wechat/backcall",
)
return {"ok": True, "detail": "支付链接已生成", "order": order, "pay_url": pay_url}
@app.get("/api/pay/wechat/")
async def pay_wechat_ready():
return {"ok": True, "detail": "wechat pay api ready"}
@app.post("/api/pay/wechat/")
async def pay_wechat_create(req: BillingRechargeCreateRequest, request: Request):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
order, pay_url = await _create_recharge_order_with_pay_url(
user_id=user["id"],
username=user["username"],
credits=int(req.tokens),
callback_url=f"{str(request.base_url).rstrip('/')}/api/pay/wechat/backcall",
order_meta={
"subscriber_name": (req.subscriber_name or "").strip(),
"subscriber_phone": (req.subscriber_phone or "").strip(),
"shipping_address": (req.shipping_address or "").strip(),
},
channel=(req.channel or "wechat").strip() or "wechat",
)
return {"ok": True, "detail": "支付订单已创建", "order": order, "pay_url": pay_url}
@app.get("/api/pay/address/suggest")
async def pay_address_suggest(request: Request):
ip = _client_ip(request)
if not ip or ip in {"unknown", "127.0.0.1", "::1"}:
return {"ok": True, "detail": "本地环境,建议手动填写地址", "ip": ip or "unknown", "address": ""}
url = f"https://ip-api.com/json/{ip}?lang=zh-CN"
try:
async with httpx.AsyncClient(timeout=5) as client:
r = await client.get(url)
body = r.json() if r.content else {}
if r.status_code >= 400 or not isinstance(body, dict):
return {"ok": True, "detail": "IP地址解析失败已填入IP信息可手动修改", "ip": ip, "address": f"IP:{ip}"}
if body.get("status") != "success":
return {"ok": True, "detail": "IP地址解析失败已填入IP信息可手动修改", "ip": ip, "address": f"IP:{ip}"}
country = str(body.get("country") or "").strip()
region = str(body.get("regionName") or "").strip()
city = str(body.get("city") or "").strip()
isp = str(body.get("isp") or "").strip()
addr = "".join([x for x in [country, region, city] if x])
if isp:
addr = f"{addr}{isp}" if addr else isp
if not addr:
addr = f"IP:{ip}"
return {"ok": True, "detail": "已根据IP自动识别地址并填充可手动修改", "ip": ip, "address": addr}
except Exception:
return {"ok": True, "detail": "IP地址解析失败已填入IP信息可手动修改", "ip": ip, "address": f"IP:{ip}"}
@app.post("/api/billing/recharge/notify")
async def billing_recharge_notify(req: BillingRechargeNotifyRequest, request: Request):
token = (request.headers.get("X-Shop-Token") or "").strip()
if not settings.shop_backend_callback_token or token != settings.shop_backend_callback_token:
raise HTTPException(status_code=403, detail="forbidden")
user = _require_user(request)
uid = user["id"] if user else 0
# 回调优先根据登录态用户确认;若无登录态,可通过订单所属用户在 DB 内判断
if uid <= 0:
# 无用户态时,先查订单归属(通过 mark 接口会二次校验)
uid = -1
if (req.status or "").lower() not in {"paid", "success"}:
return {"ok": True, "detail": "ignored"}
if uid > 0:
ok, msg = users.mark_recharge_order_paid(
uid,
req.order_no,
paid_amount_cny=float(req.paid_amount_cny or 0.0),
external_txn_id=(req.external_txn_id or "").strip(),
meta={"status": req.status},
)
if not ok:
return {"ok": False, "detail": msg}
return {"ok": True, "detail": msg}
# no session callback path: iterate known users is unavailable; use direct sql in store by probing order owner
# fallback: let store resolve user by order internally via new helper-like behavior
owner_id = users.get_recharge_order_user_id(req.order_no)
if not owner_id:
return {"ok": False, "detail": "订单不存在"}
ok, msg = users.mark_recharge_order_paid(
int(owner_id),
req.order_no,
paid_amount_cny=float(req.paid_amount_cny or 0.0),
external_txn_id=(req.external_txn_id or "").strip(),
meta={"status": req.status},
)
if not ok:
return {"ok": False, "detail": msg}
return {"ok": True, "detail": msg}
@app.post("/api/pay/wechat/backcall")
async def pay_wechat_backcall(req: BillingRechargeNotifyRequest, request: Request):
configured = (settings.shop_backend_callback_token or "").strip()
provided = (request.headers.get("X-Shop-Token") or request.query_params.get("token") or "").strip()
if configured and provided != configured:
raise HTTPException(status_code=403, detail="forbidden")
if (req.status or "").lower() not in {"paid", "success"}:
return {"ok": True, "detail": "ignored"}
owner_id = users.get_recharge_order_user_id(req.order_no)
if not owner_id:
return {"ok": False, "detail": "订单不存在"}
ok, msg = users.mark_recharge_order_paid(
int(owner_id),
req.order_no,
paid_amount_cny=float(req.paid_amount_cny or 0.0),
external_txn_id=(req.external_txn_id or "").strip(),
meta={"status": req.status, "source": "wechat_backcall"},
)
if not ok:
return {"ok": False, "detail": msg}
return {"ok": True, "detail": msg}
@app.post("/api/auth/wechat/bind")
async def auth_wechat_bind(req: WechatBindingRequest, request: Request):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
appid = (req.appid or "").strip()
secret = (req.secret or "").strip()
if not appid or not secret:
return {"ok": False, "detail": "appid/secret 不能为空"}
created = users.add_wechat_binding(
user_id=user["id"],
account_name=(req.account_name or "").strip() or "公众号账号",
appid=appid,
secret=secret,
author=(req.author or "").strip(),
thumb_media_id=(req.thumb_media_id or "").strip(),
thumb_image_path=(req.thumb_image_path or "").strip(),
)
return {"ok": True, "detail": "公众号账号绑定成功", "account": created}
@app.post("/api/auth/wechat/switch")
async def auth_wechat_switch(req: WechatSwitchRequest, request: Request):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
ok = users.switch_active_wechat_binding(user["id"], int(req.account_id))
if not ok:
return {"ok": False, "detail": "切换失败:账号不存在或无权限"}
return {"ok": True, "detail": "已切换当前公众号账号"}
@app.post("/api/auth/wechat/delete")
async def auth_wechat_delete(req: WechatDeleteRequest, request: Request):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
ok = users.delete_wechat_binding(user["id"], int(req.account_id))
if not ok:
return {"ok": False, "detail": "删除失败:账号不存在或无权限"}
return {"ok": True, "detail": "公众号账号已删除"}
@app.post("/api/auth/ai-models/add")
async def auth_ai_model_add(req: AIModelCreateRequest, request: Request):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
api_key = (req.api_key or "").strip()
model = (req.model or "").strip()
if not api_key:
return {"ok": False, "detail": "API Key 不能为空"}
if not model:
return {"ok": False, "detail": "模型名不能为空"}
created = users.add_ai_model(
user_id=user["id"],
model_name=(req.model_name or "").strip(),
api_key=api_key,
base_url=(req.base_url or "").strip(),
model=model,
image_model=(req.image_model or "").strip(),
timeout_sec=max(10.0, float(req.timeout_sec or 120.0)),
max_output_tokens=max(256, int(req.max_output_tokens or 8192)),
max_retries=max(0, int(req.max_retries or 0)),
)
return {"ok": True, "detail": "模型配置已保存并设为当前", "model_config": created}
@app.post("/api/auth/ai-models/switch")
async def auth_ai_model_switch(req: AIModelSwitchRequest, request: Request):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
ok = users.switch_active_ai_model(user["id"], int(req.model_id))
if not ok:
return {"ok": False, "detail": "切换失败:模型不存在或无权限"}
return {"ok": True, "detail": "已切换当前模型"}
@app.post("/api/auth/ai-models/delete")
async def auth_ai_model_delete(req: AIModelDeleteRequest, request: Request):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
ok = users.delete_ai_model(user["id"], int(req.model_id))
if not ok:
return {"ok": False, "detail": "删除失败:模型不存在或无权限"}
return {"ok": True, "detail": "模型配置已删除"}
@app.post("/api/auth/ai-models/image-model/update")
async def auth_ai_image_model_update(req: AIImageModelUpdateRequest, request: Request):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
image_model = (req.image_model or "").strip()
if not image_model:
return {"ok": False, "detail": "文生图模型不能为空"}
ok = users.update_active_ai_image_model(user["id"], image_model)
if not ok:
return {"ok": False, "detail": "保存失败:请先配置文本模型"}
return {"ok": True, "detail": "文生图模型已更新"}
@app.post("/api/rewrite")
async def rewrite(req: RewriteRequest, request: Request):
rid = getattr(request.state, "request_id", "")
src = req.source_text or ""
logger.info(
"api_rewrite_in rid=%s source_chars=%d title_hint_chars=%d tone=%s audience=%s "
"keep_points_chars=%d avoid_words_chars=%d target_body_chars=%d",
rid,
len(src),
len(req.title_hint or ""),
req.tone,
req.audience,
len(req.keep_points or ""),
len(req.avoid_words or ""),
int(req.target_body_chars or 500),
)
user = _require_user(request)
if not user:
raise HTTPException(status_code=401, detail="请先登录")
model_cfg, model_source = _select_model_cfg(user["id"], prefer_vip=True)
if model_source == "vip_empty":
raise HTTPException(status_code=402, detail=_quota_detail())
if not model_cfg:
raise HTTPException(status_code=400, detail="请先在设置页配置 AI 模型")
backup = {
"openai_api_key": settings.openai_api_key,
"openai_base_url": settings.openai_base_url,
"openai_model": settings.openai_model,
"openai_timeout": settings.openai_timeout,
"openai_max_output_tokens": settings.openai_max_output_tokens,
"openai_max_retries": settings.openai_max_retries,
"openai_image_model": settings.openai_image_model,
}
try:
settings.openai_api_key = model_cfg.get("api_key") or ""
settings.openai_base_url = model_cfg.get("base_url") or ""
settings.openai_model = model_cfg.get("model") or ""
settings.openai_timeout = float(model_cfg.get("timeout_sec") or 120.0)
settings.openai_max_output_tokens = int(model_cfg.get("max_output_tokens") or 8192)
settings.openai_max_retries = int(model_cfg.get("max_retries") or 0)
result = AIRewriter().rewrite(req, request_id=rid)
finally:
settings.openai_api_key = backup["openai_api_key"]
settings.openai_base_url = backup["openai_base_url"]
settings.openai_model = backup["openai_model"]
settings.openai_timeout = backup["openai_timeout"]
settings.openai_max_output_tokens = backup["openai_max_output_tokens"]
settings.openai_max_retries = backup["openai_max_retries"]
settings.openai_image_model = backup["openai_image_model"]
usage = ((result.trace or {}).get("usage") or {}) if isinstance(result.trace, dict) else {}
prompt_tokens = int(usage.get("prompt_tokens") or 0)
completion_tokens = int(usage.get("completion_tokens") or 0)
total_tokens = int(usage.get("total_tokens") or 0)
billed_basis = "usage_tokens" if total_tokens > 0 else "char_estimate"
token_cost = _estimate_rewrite_cost(req, result)
vip_status = users.get_vip_status(user["id"])
should_consume = bool(vip_status.get("vip_enabled"))
if should_consume:
ok_cost, balance = users.consume_tokens(
user["id"],
token_cost,
kind="rewrite",
ref_type="request",
ref_id=rid,
detail={
"source_chars": len((req.source_text or "").strip()),
"target_body_chars": int(req.target_body_chars or 0),
"title_chars": len((result.title or "").strip()),
"summary_chars": len((result.summary or "").strip()),
"body_chars": len((result.body_markdown or "").strip()),
"model": model_cfg.get("model") if model_cfg else "",
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"total_tokens": total_tokens,
"credits_rule": (
f"1000000 tokens={float(settings.credits_token_price_per_million_cny):.2f}元,"
f"图片{int(settings.credits_image_price_package_images)}张={float(settings.credits_image_price_package_cny):.2f}元,"
f"{int(settings.credits_recharge_package_credits)}Credits={float(settings.credits_recharge_package_amount):.2f}"
),
"billed_basis": billed_basis,
},
)
if not ok_cost:
raise HTTPException(status_code=402, detail=_quota_detail())
if result.trace is None:
result.trace = {}
result.trace["vip"] = {
"model_source": "platform" if model_source == "vip" else "user",
"credits_cost": token_cost,
"credits_balance": balance,
"billed_basis": billed_basis,
"total_tokens": total_tokens,
}
tr = result.trace or {}
logger.info(
"api_rewrite_out rid=%s mode=%s duration_ms=%s quality_notes=%d trace_steps=%s soft_accept=%s",
rid,
result.mode,
tr.get("duration_ms"),
len(result.quality_notes or []),
len((tr.get("steps") or [])),
tr.get("quality_soft_accept"),
)
return result
@app.post("/api/publish/wechat")
async def publish_wechat(req: WechatPublishRequest, request: Request):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
binding = users.get_active_wechat_binding(user["id"])
if not binding:
return {"ok": False, "detail": "当前账号未绑定公众号 token请先在页面绑定"}
rid = getattr(request.state, "request_id", "")
logger.info(
"api_wechat_in rid=%s title_chars=%d summary_chars=%d body_md_chars=%d author_set=%s",
rid,
len(req.title or ""),
len(req.summary or ""),
len(req.body_markdown or ""),
bool((req.author or "").strip()),
)
out = await wechat.publish_draft(req, request_id=rid, account=binding)
wcode = (out.data or {}).get("errcode") if isinstance(out.data, dict) else None
logger.info(
"api_wechat_out rid=%s ok=%s wechat_errcode=%s detail_preview=%s",
rid,
out.ok,
wcode,
(out.detail or "")[:240],
)
return out
@app.post("/api/wechat/cover/upload")
async def upload_wechat_cover(request: Request, file: UploadFile = File(...)):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
binding = users.get_active_wechat_binding(user["id"])
if not binding:
return {"ok": False, "detail": "当前账号未绑定公众号 token请先在页面绑定"}
rid = getattr(request.state, "request_id", "")
fn = file.filename or "cover.jpg"
content = await file.read()
logger.info("api_wechat_cover_upload_in rid=%s filename=%s bytes=%d", rid, fn, len(content))
out = await wechat.upload_cover(fn, content, request_id=rid, account=binding)
logger.info(
"api_wechat_cover_upload_out rid=%s ok=%s detail=%s",
rid,
out.ok,
(out.detail or "")[:160],
)
return out
@app.post("/api/wechat/cover/upload-by-url")
async def upload_wechat_cover_by_url(req: WechatCoverUploadByUrlRequest, request: Request):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
binding = users.get_active_wechat_binding(user["id"])
if not binding:
return {"ok": False, "detail": "当前账号未绑定公众号 token请先在页面绑定"}
rid = getattr(request.state, "request_id", "")
image_url = (req.image_url or "").strip()
if not image_url:
return {"ok": False, "detail": "图片 URL 不能为空"}
parsed = urlparse(image_url)
if parsed.scheme not in {"http", "https"}:
return {"ok": False, "detail": "仅支持 http/https 图片地址"}
try:
async with httpx.AsyncClient(timeout=30, follow_redirects=True) as client:
resp = await client.get(image_url)
except Exception as exc:
logger.warning("api_wechat_cover_url_fetch_fail rid=%s url=%s err=%s", rid, image_url, exc)
return {"ok": False, "detail": "图片下载失败,请检查 URL 是否可访问"}
if resp.status_code >= 400:
return {"ok": False, "detail": f"图片下载失败HTTP {resp.status_code}"}
content_type = (resp.headers.get("content-type") or "").lower()
if not content_type.startswith("image/"):
return {"ok": False, "detail": "URL 指向的内容不是图片"}
content = resp.content or b""
if not content:
return {"ok": False, "detail": "图片内容为空"}
if len(content) > 10 * 1024 * 1024:
return {"ok": False, "detail": "图片过大,请使用 10MB 以内的图片"}
ext = Path(parsed.path or "").suffix.lower()
if ext not in {".jpg", ".jpeg", ".png", ".webp"}:
ext = ".png" if "png" in content_type else ".jpg"
fn = f"cover_from_url{ext}"
logger.info("api_wechat_cover_upload_url_in rid=%s url=%s bytes=%d", rid, image_url, len(content))
out = await wechat.upload_cover(fn, content, request_id=rid, account=binding)
logger.info(
"api_wechat_cover_upload_url_out rid=%s ok=%s detail=%s",
rid,
out.ok,
(out.detail or "")[:160],
)
return out
@app.post("/api/wechat/cover/generate")
async def generate_wechat_cover(req: WechatCoverGenerateRequest, request: Request):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
binding = users.get_active_wechat_binding(user["id"]) if req.upload_to_wechat else None
if req.upload_to_wechat and not binding:
return {"ok": False, "detail": "当前账号未绑定公众号 token请先在页面绑定"}
rid = getattr(request.state, "request_id", "")
logger.info(
"api_wechat_cover_generate_in rid=%s title_chars=%d summary_chars=%d upload_to_wechat=%s",
rid,
len(req.title or ""),
len(req.summary or ""),
req.upload_to_wechat,
)
model_cfg, model_source = _select_model_cfg(user["id"], prefer_vip=True)
if model_source == "vip_empty":
return {"ok": False, "detail": _quota_detail(), "upgrade_required": True}
image_model_override = (req.image_model or "").strip()
backup = {
"openai_api_key": settings.openai_api_key,
"openai_base_url": settings.openai_base_url,
"openai_model": settings.openai_model,
"openai_timeout": settings.openai_timeout,
"openai_max_output_tokens": settings.openai_max_output_tokens,
"openai_max_retries": settings.openai_max_retries,
"openai_image_model": settings.openai_image_model,
}
try:
if model_cfg:
settings.openai_api_key = model_cfg.get("api_key") or ""
settings.openai_base_url = model_cfg.get("base_url") or ""
settings.openai_model = model_cfg.get("model") or ""
settings.openai_timeout = float(model_cfg.get("timeout_sec") or 120.0)
settings.openai_max_output_tokens = int(model_cfg.get("max_output_tokens") or 8192)
settings.openai_max_retries = int(model_cfg.get("max_retries") or 0)
settings.openai_image_model = (model_cfg.get("image_model") or "").strip() or backup["openai_image_model"]
else:
settings.openai_api_key = ""
settings.openai_base_url = ""
settings.openai_model = ""
settings.openai_timeout = 120.0
settings.openai_max_output_tokens = 8192
settings.openai_max_retries = 0
settings.openai_image_model = backup["openai_image_model"]
if image_model_override:
settings.openai_image_model = image_model_override
out = await PosterMaterialService(wechat).generate_cover(req, request_id=rid, account=binding)
finally:
settings.openai_api_key = backup["openai_api_key"]
settings.openai_base_url = backup["openai_base_url"]
settings.openai_model = backup["openai_model"]
settings.openai_timeout = backup["openai_timeout"]
settings.openai_max_output_tokens = backup["openai_max_output_tokens"]
settings.openai_max_retries = backup["openai_max_retries"]
settings.openai_image_model = backup["openai_image_model"]
logger.info(
"api_wechat_cover_generate_out rid=%s ok=%s thumb=%s note=%s warnings=%d",
rid,
out.ok,
bool(out.thumb_media_id),
out.note,
len(out.warnings),
)
if out.ok and model_source == "vip":
token_cost = _estimate_image_cost(1)
ok_cost, balance = users.consume_tokens(
user["id"],
token_cost,
kind="cover_generate",
ref_type="request",
ref_id=rid,
detail={
"title": (req.title or "")[:120],
"style_hint": (req.style_hint or "")[:120],
"image_count": 1,
"image_model": image_model_override or settings.openai_image_model,
"image_price_package_cny": float(settings.credits_image_price_package_cny),
"image_price_package_images": int(settings.credits_image_price_package_images),
"credits_rule": (
f"图片{int(settings.credits_image_price_package_images)}张={float(settings.credits_image_price_package_cny):.2f}元,"
f"{int(settings.credits_recharge_package_credits)}Credits={float(settings.credits_recharge_package_amount):.2f}"
),
},
)
if not ok_cost:
return {"ok": False, "detail": _quota_detail(), "upgrade_required": True}
out.warnings = list(out.warnings or [])
out.warnings.append(f"已扣减 {token_cost} Credits可用余额 {balance}")
return out
@app.post("/api/wechat/material/upload")
async def upload_wechat_material(request: Request, file: UploadFile = File(...)):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
binding = users.get_active_wechat_binding(user["id"])
if not binding:
return {"ok": False, "detail": "当前账号未绑定公众号 token请先在页面绑定"}
rid = getattr(request.state, "request_id", "")
fn = file.filename or "material.jpg"
content = await file.read()
logger.info("api_wechat_material_upload_in rid=%s filename=%s bytes=%d", rid, fn, len(content))
out = await wechat.upload_body_material(fn, content, request_id=rid, account=binding)
logger.info(
"api_wechat_material_upload_out rid=%s ok=%s detail=%s",
rid,
out.ok,
(out.detail or "")[:160],
)
return out
@app.post("/api/material/posters/generate")
async def generate_posters(req: PosterGenerateRequest, request: Request):
user = _require_user(request)
if not user:
return {"ok": False, "detail": "请先登录"}
binding = users.get_active_wechat_binding(user["id"]) if req.upload_to_wechat else None
if req.upload_to_wechat and not binding:
return {"ok": False, "detail": "当前账号未绑定公众号 token请先在页面绑定"}
rid = getattr(request.state, "request_id", "")
logger.info(
"api_poster_generate_in rid=%s body_chars=%d upload_to_wechat=%s max_images=%d",
rid,
len(req.body_markdown or ""),
req.upload_to_wechat,
int(req.max_images or 0),
)
model_cfg, model_source = _select_model_cfg(user["id"], prefer_vip=True)
if model_source == "vip_empty":
return {"ok": False, "detail": _quota_detail(), "upgrade_required": True}
image_model_override = (req.image_model or "").strip()
backup = {
"openai_api_key": settings.openai_api_key,
"openai_base_url": settings.openai_base_url,
"openai_model": settings.openai_model,
"openai_timeout": settings.openai_timeout,
"openai_max_output_tokens": settings.openai_max_output_tokens,
"openai_max_retries": settings.openai_max_retries,
"openai_image_model": settings.openai_image_model,
}
try:
if model_cfg:
settings.openai_api_key = model_cfg.get("api_key") or ""
settings.openai_base_url = model_cfg.get("base_url") or ""
settings.openai_model = model_cfg.get("model") or ""
settings.openai_timeout = float(model_cfg.get("timeout_sec") or 120.0)
settings.openai_max_output_tokens = int(model_cfg.get("max_output_tokens") or 8192)
settings.openai_max_retries = int(model_cfg.get("max_retries") or 0)
settings.openai_image_model = (model_cfg.get("image_model") or "").strip() or backup["openai_image_model"]
else:
settings.openai_api_key = ""
settings.openai_base_url = ""
settings.openai_model = ""
settings.openai_timeout = 120.0
settings.openai_max_output_tokens = 8192
settings.openai_max_retries = 0
settings.openai_image_model = backup["openai_image_model"]
if image_model_override:
settings.openai_image_model = image_model_override
out = await PosterMaterialService(wechat).generate(req, request_id=rid, account=binding)
finally:
settings.openai_api_key = backup["openai_api_key"]
settings.openai_base_url = backup["openai_base_url"]
settings.openai_model = backup["openai_model"]
settings.openai_timeout = backup["openai_timeout"]
settings.openai_max_output_tokens = backup["openai_max_output_tokens"]
settings.openai_max_retries = backup["openai_max_retries"]
settings.openai_image_model = backup["openai_image_model"]
logger.info(
"api_poster_generate_out rid=%s ok=%s posters=%d warnings=%d",
rid,
out.ok,
len(out.posters),
len(out.warnings),
)
if out.ok and model_source == "vip":
image_count = max(0, len(out.posters or []))
token_cost = _estimate_image_cost(image_count)
ok_cost, balance = users.consume_tokens(
user["id"],
token_cost,
kind="poster_generate",
ref_type="request",
ref_id=rid,
detail={
"image_count": image_count,
"body_chars": len((req.body_markdown or "").strip()),
"max_images": int(req.max_images or 0),
"image_model": image_model_override or settings.openai_image_model,
"image_price_package_cny": float(settings.credits_image_price_package_cny),
"image_price_package_images": int(settings.credits_image_price_package_images),
"credits_rule": (
f"图片{int(settings.credits_image_price_package_images)}张={float(settings.credits_image_price_package_cny):.2f}元,"
f"{int(settings.credits_recharge_package_credits)}Credits={float(settings.credits_recharge_package_amount):.2f}"
),
},
)
if not ok_cost:
return {"ok": False, "detail": _quota_detail(), "upgrade_required": True}
out.warnings = list(out.warnings or [])
out.warnings.append(f"已扣减 {token_cost} Credits可用余额 {balance}")
return out
@app.post("/api/publish/im")
async def publish_im(req: IMPublishRequest, request: Request):
rid = getattr(request.state, "request_id", "")
logger.info(
"api_im_in rid=%s title_chars=%d body_md_chars=%d",
rid,
len(req.title or ""),
len(req.body_markdown or ""),
)
out = await im.publish(req, request_id=rid)
logger.info("api_im_out rid=%s ok=%s detail=%s", rid, out.ok, (out.detail or "")[:120])
return out