from __future__ import annotations import difflib import json import logging import re import time from typing import Any from textwrap import shorten from urllib.parse import urlparse from openai import OpenAI from app.config import settings from app.schemas import RewriteRequest, RewriteResponse logger = logging.getLogger(__name__) def _api_host(url: str | None) -> str: if not url: return "" try: return urlparse(url).netloc or "" except Exception: return "" def _is_likely_timeout_error(exc: BaseException) -> bool: n = type(exc).__name__.lower() if "timeout" in n: return True s = str(exc).lower() return "timed out" in s or "timeout" in s # 短文洗稿:5 个自然段、正文总字数上限(含标点) MAX_BODY_CHARS = 500 MIN_BODY_CHARS = 80 def _preview_for_log(text: str, limit: int = 400) -> str: t = (text or "").replace("\r\n", "\n").replace("\n", " ").strip() if len(t) <= limit: return t return t[: limit - 1] + "…" SYSTEM_PROMPT = """ 你是资深中文科普类公众号编辑,擅长把长文、线程贴改写成**极短、好读**的推送。 目标:在**不偏离原意**的前提下,用最少字数讲清一件事;不要写成技术方案、长文大纲或带很多小标题的文章。 硬性规则: 1) **忠实原意**:只概括、转述原文已有信息,不编造事实,不偷换主题; 2) 语气通俗、干脆,避免套话堆砌; 3) 只输出合法 JSON:title, summary, body_markdown; 4) **body_markdown 约束**:恰好 **5 个自然段**;段与段之间用一个空行分隔;**不要**使用 # / ## 标题符号;全文(正文)总字数 **不超过 500 字**(含标点); 5) title、summary 也要短:标题约 8~18 字;摘要约 40~80 字; 6) 正文每段需首行缩进(建议段首使用两个全角空格「  」),避免顶格; 7) 关键观点需要加粗:请用 Markdown `**加粗**` 标出 2~4 个重点短语; 8) JSON 字符串内引号请用「」或『』,勿用未转义的英文 "。 """.strip() REWRITE_SCHEMA_HINT = """ 请输出 JSON(勿包在 ``` 里),例如: { "title": "短标题,点明主题", "summary": "一句话到两句话摘要", "body_markdown": "第一段内容…\\n\\n第二段…\\n\\n第三段…\\n\\n第四段…\\n\\n第五段…" } body_markdown 写法: - 必须且只能有 **5 段**:每段若干完整句子,段之间 **\\n\\n**(空一行); - **禁止** markdown 标题(不要用 #); - 正文总长 **≤500 字**,宁可短而清楚,不要写满废话; - 每段段首请保留首行缩进(两个全角空格「  」); - 请用 `**...**` 加粗 2~4 个关键观点词; - 内容顺序建议:第 1 段交代在说什么;中间 3 段展开关键信息;最后 1 段收束或提醒(均须紧扣原文,勿乱发挥)。 """.strip() # 通义等模型若首次过短/结构不对,再要一次 _JSON_BODY_TOO_SHORT_RETRY = """ 【系统复检】上一次 body_markdown 不符合要求。请重输出**完整** JSON: - 正文必须 **恰好 5 个自然段**(仅 \\n\\n 分段),无 # 标题,总字数 **≤500 字**; - 忠实原稿、简短高效; - 引号只用「」『』; - 只输出 JSON。 """.strip() class AIRewriter: def __init__(self) -> None: self._client = None self._prefer_chat_first = False if settings.openai_api_key: base_url = settings.openai_base_url or "" self._prefer_chat_first = "dashscope.aliyuncs.com" in base_url self._client = OpenAI( api_key=settings.openai_api_key, base_url=settings.openai_base_url, timeout=settings.openai_timeout, max_retries=max(0, int(settings.openai_max_retries)), ) logger.info( "AIRewriter_init model=%s api_host=%s prefer_chat_first=%s timeout_s=%s max_retries=%s", settings.openai_model, _api_host(settings.openai_base_url) or "(default)", self._prefer_chat_first, settings.openai_timeout, settings.openai_max_retries, ) else: logger.warning("AIRewriter_init openai_key_missing=1 rewrite_will_use_fallback_only=1") def rewrite(self, req: RewriteRequest, request_id: str = "") -> RewriteResponse: cleaned_source = self._clean_source(req.source_text) started = time.monotonic() trace: dict[str, Any] = { "request_id": request_id or None, "model": settings.openai_model, "provider": "dashscope" if self._prefer_chat_first else "openai_compatible", "source_chars_in": len(req.source_text or ""), "cleaned_chars": len(cleaned_source), "openai_timeout_env_sec": settings.openai_timeout, "steps": [], } def _step(name: str, **extra: Any) -> None: elapsed_ms = round((time.monotonic() - started) * 1000, 1) trace["steps"].append({"name": name, "elapsed_ms": elapsed_ms, **extra}) extra_fmt = "" if extra: parts: list[str] = [] for k, v in extra.items(): s = repr(v) if len(s) > 200: s = s[:197] + "..." parts.append(f"{k}={s}") extra_fmt = " " + " ".join(parts) logger.info( "rewrite_step rid=%s step=%s elapsed_ms=%s%s", request_id or "-", name, elapsed_ms, extra_fmt, ) raw_in = (req.source_text or "").replace("\r\n", "\n").strip() _step("clean_source", truncated=len(cleaned_source) < len(raw_in)) logger.info( "rewrite_enter rid=%s model=%s client_ok=%s prefer_chat_first=%s " "source_chars=%d cleaned_chars=%d ai_soft_accept=%s", request_id or "-", settings.openai_model, bool(self._client), self._prefer_chat_first, trace["source_chars_in"], len(cleaned_source), settings.ai_soft_accept, ) # Primary: model rewrite + quality gate + optional second-pass polish. if self._client: # 通义长文 JSON 常需 40~90s+。旧代码错误地将首轮 cap 在 30s → APITimeoutError → 仅走兜底。 if self._prefer_chat_first: first_pass_timeout = max(45.0, min(300.0, float(settings.openai_timeout))) else: first_pass_timeout = max(20.0, min(120.0, float(settings.openai_timeout))) trace["first_pass_http_timeout_sec"] = round(first_pass_timeout, 1) logger.info( "rewrite_model_first_pass rid=%s first_pass_http_timeout_s=%.1f openai_timeout_env_s=%.1f " "lenient_qa=%s note=dashscope_uses_full_openai_timeout_not_capped_30", request_id or "-", first_pass_timeout, settings.openai_timeout, self._prefer_chat_first, ) t0 = time.monotonic() draft = self._model_rewrite(req, cleaned_source, timeout_sec=first_pass_timeout, request_id=request_id) _step( "model_first_pass", duration_ms=round((time.monotonic() - t0) * 1000, 1), ok=bool(draft), timeout_sec=first_pass_timeout, ) if not draft: trace["quality_issues_final"] = ["模型未返回有效 JSON 或请求超时"] trace["model_unavailable_hint"] = ( "排查:① 日志是否 APITimeoutError → 提高 OPENAI_TIMEOUT(通义建议 120~180)并确认 " "first_pass_http_timeout_sec 与 trace.openai_timeout_env_sec 一致;② 网络到 " "dashscope.aliyuncs.com;③ 见 model_call_fail 的 is_likely_timeout。" ) _step("model_first_pass_failed", detail="timeout_or_invalid_json") if draft: normalized = self._normalize_result(draft) issues = self._quality_issues( req, cleaned_source, normalized, lenient=self._prefer_chat_first ) trace["quality_issues_first"] = issues logger.info( "rewrite quality check rid=%s first_issues=%s body_chars=%d", request_id, issues, len(normalized.get("body_markdown", "") or ""), ) elapsed = time.monotonic() - started remaining_budget = max(0.0, (first_pass_timeout + 25.0) - elapsed) polish_budget = min(22.0, remaining_budget) if self._prefer_chat_first else min(30.0, remaining_budget) if issues and not ( remaining_budget >= 8.0 and polish_budget >= 6.0 ): logger.info( "rewrite_polish_skipped rid=%s first_issues=%d remaining_budget_s=%.1f polish_budget_s=%.1f", request_id or "-", len(issues), remaining_budget, polish_budget, ) if issues and remaining_budget >= 8.0 and polish_budget >= 6.0: t1 = time.monotonic() polished = self._model_polish( req, cleaned_source, normalized, issues, timeout_sec=polish_budget, request_id=request_id, ) _step( "model_polish", duration_ms=round((time.monotonic() - t1) * 1000, 1), ok=bool(polished), ) if polished: normalized = self._normalize_result(polished) final_issues = self._quality_issues( req, cleaned_source, normalized, lenient=self._prefer_chat_first ) trace["quality_issues_final"] = final_issues if not final_issues: trace["duration_ms"] = round((time.monotonic() - started) * 1000, 1) trace["mode"] = "ai" logger.info( "rewrite success rid=%s duration_ms=%.1f mode=ai", request_id, trace["duration_ms"], ) return RewriteResponse(**normalized, mode="ai", quality_notes=[], trace=trace) # 模型已返回有效 JSON:默认「软接受」——仍视为 AI 洗稿,质检问题写入 quality_notes,避免误用模板稿 if settings.ai_soft_accept and self._model_output_usable(normalized): trace["duration_ms"] = round((time.monotonic() - started) * 1000, 1) trace["mode"] = "ai" trace["quality_soft_accept"] = True trace["quality_warnings"] = final_issues logger.warning( "rewrite soft-accept rid=%s warnings=%s body_chars=%d", request_id, final_issues, len(normalized.get("body_markdown", "") or ""), ) return RewriteResponse( **normalized, mode="ai", quality_notes=final_issues, trace=trace, ) logger.warning( "rewrite quality gate fallback rid=%s issues=%s", request_id, final_issues, ) _step("quality_gate_failed", issues=final_issues) else: _step("skip_model", reason="OPENAI_API_KEY 未配置") trace["quality_issues_final"] = ["未配置 OPENAI_API_KEY,使用本地保底稿"] # Secondary: deterministic fallback with publishable structure. reason = "模型未返回有效 JSON、超时,或质量未达标,已使用结构化保底稿" trace["duration_ms"] = round((time.monotonic() - started) * 1000, 1) logger.info( "rewrite fallback rid=%s duration_ms=%.1f last_issues=%s", request_id, trace["duration_ms"], trace.get("quality_issues_final"), ) return self._fallback_rewrite(req, cleaned_source, reason=reason, trace=trace) def _model_rewrite( self, req: RewriteRequest, cleaned_source: str, timeout_sec: float, request_id: str = "" ) -> dict | None: user_prompt = self._build_user_prompt(req, cleaned_source) return self._call_model_json(user_prompt, timeout_sec=timeout_sec, request_id=request_id) def _model_polish( self, req: RewriteRequest, cleaned_source: str, normalized: dict, issues: list[str], timeout_sec: float, request_id: str = "", ) -> dict | None: issue_text = "\n".join([f"- {i}" for i in issues]) user_prompt = f""" 你上一次的改写稿未通过质检,请针对下列问题重写;体裁仍为**科普介绍类公众号**,**忠实原稿**,不要写成技术方案或内部汇报。 {issue_text} 原始内容: {cleaned_source} 上一次草稿: 标题:{normalized.get('title', '')} 摘要:{normalized.get('summary', '')} 正文: {normalized.get('body_markdown', '')} 用户改写偏好: - 标题参考:{req.title_hint or '自动生成'} - 语气风格:{req.tone} - 目标读者:{req.audience} - 必须保留观点:{req.keep_points or '无'} - 避免词汇:{req.avoid_words or '无'} 请输出一版全新稿件。{REWRITE_SCHEMA_HINT} """.strip() return self._call_model_json(user_prompt, timeout_sec=timeout_sec, request_id=request_id) def _build_user_prompt(self, req: RewriteRequest, cleaned_source: str) -> str: return f""" 原始内容(已清洗): {cleaned_source} 用户改写偏好: - 标题参考:{req.title_hint or '自动生成'} - 语气风格:{req.tone} - 目标读者:{req.audience} - 必须保留观点:{req.keep_points or '无'} - 避免词汇:{req.avoid_words or '无'} 任务:在**不偏离原帖主题与事实**的前提下,改写成科普介绍风格的公众号正文(好读、讲清楚,而非技术实施方案)。{REWRITE_SCHEMA_HINT} """.strip() def _fallback_rewrite( self, req: RewriteRequest, cleaned_source: str, reason: str, trace: dict[str, Any] | None = None ) -> RewriteResponse: sentences = self._extract_sentences(cleaned_source) points = self._pick_key_points(sentences, limit=5) title = req.title_hint.strip() or self._build_fallback_title(sentences) summary = self._build_fallback_summary(points, cleaned_source) analysis = self._build_analysis(points) conclusion = "细节仍以原帖为准;若话题在更新,请对照出处核对。" def _one_line(s: str, n: int) -> str: t = re.sub(r"\s+", " ", (s or "").strip()) return t if len(t) <= n else t[: n - 1] + "…" paras = [ _one_line(self._build_intro(points, cleaned_source), 105), _one_line(analysis["cause"], 105), _one_line(analysis["impact"], 105), _one_line(analysis["risk"], 105), _one_line(conclusion, 105), ] body = "\n\n".join(paras) if len(body) > MAX_BODY_CHARS: body = body[: MAX_BODY_CHARS - 1] + "…" normalized = { "title": title, "summary": summary, "body_markdown": self._format_markdown(body), } if trace is not None: trace["mode"] = "fallback" trace["fallback_reason"] = reason rid = (trace or {}).get("request_id") or "-" logger.info( "rewrite_fallback_compose rid=%s reason=%s title_chars=%d summary_chars=%d body_chars=%d points=%d", rid, reason[:120], len(normalized["title"]), len(normalized["summary"]), len(normalized["body_markdown"]), len(points), ) return RewriteResponse(**normalized, mode="fallback", quality_notes=[reason], trace=trace) def _build_fallback_title(self, sentences: list[str]) -> str: seed = sentences[0] if sentences else "内容导读" seed = shorten(seed, width=16, placeholder="") return f"{seed}:一文读懂在说什么" def _build_fallback_summary(self, points: list[str], source: str) -> str: if len(points) >= 2: return shorten( f"原帖在谈:{points[0]};另一点:{points[1]}。", width=85, placeholder="…", ) return shorten(re.sub(r"\s+", " ", source), width=85, placeholder="…") def _build_intro(self, points: list[str], source: str) -> str: focus = points[0] if points else shorten(source, width=42, placeholder="...") return ( f"原帖主要在谈:{focus}。下面用更适合公众号阅读的方式,把脉络和重点捋清楚,方便你快速抓住作者在表达什么。\n\n" "说明:这是基于原文的导读式整理,若需引用细节,请以原帖为准。" ) def _build_analysis(self, points: list[str]) -> dict[str, str]: p1 = points[0] if points else "原文讨论的核心现象" p2 = points[1] if len(points) > 1 else "与读者日常能感知到的关联" p3 = points[2] if len(points) > 2 else "原文可能提到的限制或尚未定论之处" return { "cause": ( f"先把事情放在原文的语境里理解:{p1}。" "这里侧重讲清楚「作者在说什么」,而不是替原文下结论。" ), "impact": ( f"对大多数读者来说,更关心的是:这和自己有什么关系。{p2}。" "若原帖偏专业,这里尽量用通俗说法转述,避免写成给决策层的公文。" ), "risk": ( f"任何公开讨论都有边界:{p3}。" "若话题仍在变化,结论可能更新,阅读时建议保留一点审慎,必要时回看原始出处。" ), } def _clean_source(self, text: str) -> str: src = (text or "").replace("\r\n", "\n").strip() src = re.sub(r"https?://\S+", "", src) src = re.sub(r"(?m)^\s*>+\s*", "", src) src = re.sub(r"(?m)^\s*[@#][^\s]+\s*$", "", src) src = re.sub(r"\n{3,}", "\n\n", src) src = re.sub(r"\s+", " ", src) src = src.strip() max_chars = max(1200, settings.openai_source_max_chars) if len(src) > max_chars: src = src[:max_chars] + " ...(原文过长,已截断后改写)" return src def _extract_sentences(self, text: str) -> list[str]: parts = re.split(r"[。!?;;.!?\n]+", text) cleaned = [p.strip(" ,,;;::。") for p in parts if p.strip()] return cleaned def _pick_key_points(self, sentences: list[str], limit: int) -> list[str]: points: list[str] = [] templates = [ "值得关注:{}", "背景要点:{}", "原文强调:{}", "延伸信息:{}", "阅读提示:{}", ] for s in sentences: if len(s) < 12: continue if len(points) >= limit: break normalized = re.sub(r"^(第一|第二|第三|第四|第五)[,,::]?", "", s).strip() normalized = re.sub(r"^[-•\\d\\.\\)\\s]+", "", normalized) text = shorten(normalized, width=50, placeholder="...") points.append(templates[len(points) % len(templates)].format(text)) if not points: points = ["原始内容信息密度较高,建议先聚焦一个核心问题再展开"] return points def _parse_response_json(self, text: str) -> dict: raw = (text or "").strip() if not raw: raise ValueError("empty model output") try: return json.loads(raw) except json.JSONDecodeError: pass fenced = re.sub(r"^```(?:json)?\s*|\s*```$", "", raw, flags=re.IGNORECASE).strip() if fenced != raw: try: return json.loads(fenced) except json.JSONDecodeError: pass start = raw.find("{") end = raw.rfind("}") if start != -1 and end != -1 and end > start: return json.loads(raw[start : end + 1]) raise ValueError("model output is not valid JSON") def _chat_completions_json(self, user_prompt: str, timeout_sec: float, request_id: str) -> dict | None: """chat.completions:通义兼容层在 json_object 下易产出极短 JSON,故 DashScope 不传 response_format,并支持短文自动重试。""" max_attempts = 2 if self._prefer_chat_first else 1 deadline = time.monotonic() + max(0.0, timeout_sec) pe = user_prompt for attempt in range(max_attempts): if attempt == 1: pe = user_prompt + _JSON_BODY_TOO_SHORT_RETRY remaining = deadline - time.monotonic() if remaining <= 0: logger.warning( "model_call_budget_exhausted rid=%s api=chat.completions attempt=%d/%d", request_id or "-", attempt + 1, max_attempts, ) return None try: logger.info( "model_call_try rid=%s api=chat.completions.create attempt=%d/%d max_tokens=%d json_object=%s timeout_s=%.1f", request_id or "-", attempt + 1, max_attempts, settings.openai_max_output_tokens, not self._prefer_chat_first, remaining, ) t0 = time.monotonic() create_kwargs: dict[str, Any] = { "model": settings.openai_model, "messages": [ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": pe}, ], "max_tokens": settings.openai_max_output_tokens, "temperature": 0.4, "extra_body": {"enable_thinking": False}, "timeout": remaining, } # OpenAI 官方 API 在 json_object 下表现稳定;通义兼容模式若开启则常出现正文被压成一两百字。 if not self._prefer_chat_first: create_kwargs["response_format"] = {"type": "json_object"} completion = self._client.chat.completions.create(**create_kwargs) except Exception as exc: is_to = _is_likely_timeout_error(exc) logger.warning( "model_call_fail rid=%s api=chat.completions attempt=%d/%d exc_type=%s exc=%s " "is_likely_timeout=%s http_timeout_budget_s=%.1f openai_timeout_env_s=%.1f max_tokens=%d " "hint=%s", request_id or "-", attempt + 1, max_attempts, type(exc).__name__, exc, is_to, remaining, settings.openai_timeout, settings.openai_max_output_tokens, ( "典型原因:单轮 HTTP 等待短于模型生成长文 JSON 所需时间;已取消错误的 30s 上限," "请确认 OPENAI_TIMEOUT>=120 并重启进程。" ) if is_to and self._prefer_chat_first else ( "若为超时:增大 OPENAI_TIMEOUT;否则检查 Key/模型名/网络。" if is_to else "" ), ) if self._prefer_chat_first: return None raise choice = completion.choices[0] if completion.choices else None msg = (choice.message.content if choice else "") or "" fr = getattr(choice, "finish_reason", None) if choice else None usage = getattr(completion, "usage", None) udump = ( usage.model_dump() if usage is not None and hasattr(usage, "model_dump") else usage ) ms = (time.monotonic() - t0) * 1000 logger.info( "model_call_ok rid=%s api=chat.completions attempt=%d duration_ms=%.1f output_chars=%d " "finish_reason=%s usage=%s preview=%s", request_id or "-", attempt + 1, ms, len(msg), fr, udump, _preview_for_log(msg, 380), ) logger.debug( "model_call_raw rid=%s api=chat.completions attempt=%d body=%s", request_id or "-", attempt + 1, msg, ) try: parsed = self._parse_response_json(msg) except Exception as exc: logger.warning( "model_json_parse_fail rid=%s attempt=%d err=%s", request_id or "-", attempt + 1, exc, ) if not self._prefer_chat_first: raise if attempt == max_attempts - 1: return None continue raw_body = str(parsed.get("body_markdown", "")).strip() bl = len(raw_body) pc = len([p for p in re.split(r"\n\s*\n", raw_body) if p.strip()]) if self._prefer_chat_first and attempt == 0 and (bl < 40 or pc < 3): logger.warning( "model_body_retry rid=%s body_chars=%d paragraphs=%d reason=too_thin_or_not_segmented", request_id or "-", bl, pc, ) continue return parsed return None def _call_model_json(self, user_prompt: str, timeout_sec: float, request_id: str = "") -> dict | None: methods = ["chat", "responses"] if self._prefer_chat_first else ["responses", "chat"] logger.info( "model_call_begin rid=%s model=%s timeout_s=%.1f prefer_chat_first=%s prompt_chars=%d " "try_order=%s", request_id or "-", settings.openai_model, timeout_sec, self._prefer_chat_first, len(user_prompt), methods, ) for method in methods: t0 = time.monotonic() if method == "responses": try: logger.info("model_call_try rid=%s api=OpenAI.responses.create", request_id or "-") completion = self._client.responses.create( model=settings.openai_model, input=[ {"role": "system", "content": SYSTEM_PROMPT}, {"role": "user", "content": user_prompt}, ], text={"format": {"type": "json_object"}}, timeout=timeout_sec, ) output_text = completion.output_text or "" ms = (time.monotonic() - t0) * 1000 logger.info( "model_call_ok rid=%s api=responses duration_ms=%.1f output_chars=%d preview=%s", request_id or "-", ms, len(output_text), _preview_for_log(output_text, 380), ) logger.debug("model_call_raw rid=%s api=responses body=%s", request_id or "-", output_text) return self._parse_response_json(output_text) except Exception as exc: logger.warning( "model_call_fail rid=%s api=responses duration_ms=%.1f exc_type=%s exc=%s", request_id or "-", (time.monotonic() - t0) * 1000, type(exc).__name__, exc, ) continue if method == "chat": try: t_chat = time.monotonic() out = self._chat_completions_json(user_prompt, timeout_sec, request_id) if out is not None: return out if self._prefer_chat_first: logger.info( "model_call_stop rid=%s reason=dashscope_chat_no_valid_json duration_ms=%.1f", request_id or "-", (time.monotonic() - t_chat) * 1000, ) return None except Exception as exc: logger.warning( "model_call_fail rid=%s api=chat.completions duration_ms=%.1f exc_type=%s exc=%s", request_id or "-", (time.monotonic() - t0) * 1000, type(exc).__name__, exc, ) if self._prefer_chat_first: logger.info( "model_call_stop rid=%s reason=dashscope_chat_exception", request_id or "-", ) return None continue logger.error( "model_call_exhausted rid=%s methods_tried=%s result=none", request_id or "-", methods, ) return None def _normalize_result(self, data: dict) -> dict: title = str(data.get("title", "")).strip() summary = str(data.get("summary", "")).strip() body = str(data.get("body_markdown", "")).strip() if not title: title = "公众号改写稿" if not summary: summary = shorten(re.sub(r"\s+", " ", body), width=90, placeholder="...") body = re.sub(r"(?m)^#{1,6}\s+[^\n]*\n?", "", body).strip() body = self._normalize_body_length(body) body = self._format_markdown(body) return {"title": title, "summary": summary, "body_markdown": body} def _normalize_body_length(self, body: str) -> str: text = (body or "").strip() if not text: text = "(正文生成失败,请重试。)" if len(text) > MAX_BODY_CHARS: text = text[: MAX_BODY_CHARS - 1] + "…" return text def _quality_issues( self, req: RewriteRequest, source: str, normalized: dict, lenient: bool = False ) -> list[str]: issues: list[str] = [] title = normalized.get("title", "") summary = normalized.get("summary", "") body = normalized.get("body_markdown", "") min_title, max_title = (4, 30) if lenient else (6, 24) if len(title) < min_title or len(title) > max_title: issues.append(f"标题长度不理想(建议 {min_title}-{max_title} 字,短标题即可)") min_summary, max_summary = (20, 100) if lenient else (25, 90) if len(summary) < min_summary: issues.append("摘要过短") elif len(summary) > max_summary: issues.append(f"摘要过长(建议 ≤{max_summary} 字)") paragraphs = [p.strip() for p in re.split(r"\n\s*\n", body) if p.strip()] pc = len(paragraphs) need_p = 4 if lenient else 5 if pc < need_p: issues.append(f"正文需约 5 个自然段、空行分隔(当前 {pc} 段)") elif not lenient and pc > 6: issues.append(f"正文段落过多(当前 {pc} 段),请合并为 5 段左右") if len(body) > MAX_BODY_CHARS: issues.append(f"正文超过 {MAX_BODY_CHARS} 字(当前 {len(body)} 字),请压缩") elif len(body) < MIN_BODY_CHARS: issues.append(f"正文过短(当前阈值 ≥{MIN_BODY_CHARS} 字)") if re.search(r"(?m)^#+\s", body): issues.append("正文请勿使用 # 标题符号,只用自然段") if "**" not in body: issues.append("关键观点未加粗(建议 2~4 处)") paragraphs = [p.strip() for p in re.split(r"\n\s*\n", body) if p.strip()] if paragraphs: no_indent = sum(1 for p in paragraphs if not p.startswith("  ")) if no_indent >= max(2, len(paragraphs) // 2): issues.append("正文缺少首行缩进(建议每段段首使用两个全角空格)") if self._looks_like_raw_copy(source, body, lenient=lenient): issues.append("改写与原文相似度过高,疑似未充分重写") if req.avoid_words: bad_words = [w.strip() for w in re.split(r"[,,]\s*", req.avoid_words) if w.strip()] hit = [w for w in bad_words if w in body or w in summary or w in title] if hit: issues.append(f"命中禁用词: {', '.join(hit)}") ai_phrases = ["首先", "其次", "最后", "总而言之", "赋能", "闭环", "颠覆"] hit_ai = [w for w in ai_phrases if body.count(w) >= 3] if hit_ai: issues.append("存在明显 AI 套话堆叠") return issues def _looks_like_raw_copy(self, source: str, rewritten: str, lenient: bool = False) -> bool: src = re.sub(r"\s+", "", source or "") dst = re.sub(r"\s+", "", rewritten or "") if not src or not dst: return True if dst in src or src in dst: return True ratio = difflib.SequenceMatcher(a=src[:3500], b=dst[:3500]).ratio() threshold = 0.88 if lenient else 0.80 return ratio >= threshold def _model_output_usable(self, normalized: dict) -> bool: """模型 JSON 可解析且正文有实质内容时,允许软接受(不走模板保底)。""" body = (normalized.get("body_markdown") or "").strip() title = (normalized.get("title") or "").strip() if len(title) < 4 or len(body) < 40: return False if len(body) > MAX_BODY_CHARS + 80: return False return True def _format_markdown(self, text: str) -> str: body = text.replace("\r\n", "\n").strip() body = re.sub(r"\n{3,}", "\n\n", body) paragraphs = [p.strip() for p in re.split(r"\n\s*\n", body) if p.strip()] if not paragraphs: return body.strip() + "\n" # 若模型未加粗,兜底给第一段的核心短语加粗一次 merged = "\n\n".join(paragraphs) if "**" not in merged: first = paragraphs[0] first_plain = first.lstrip("  ").strip() phrase = re.split(r"[,。;:,:]", first_plain, maxsplit=1)[0].strip() phrase = phrase[:14] if len(phrase) >= 4 and phrase in first: paragraphs[0] = first.replace(phrase, f"**{phrase}**", 1) # 段首全角缩进:保持阅读习惯,避免顶格 out: list[str] = [] for p in paragraphs: seg = p.strip() if not seg: continue if seg.startswith("  "): out.append(seg) else: out.append("  " + seg.lstrip()) # 不能用 .strip(),否则会把首段的全角缩进「  」去掉 return "\n\n".join(out).rstrip() + "\n"