diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md new file mode 100644 index 0000000..5e2b892 --- /dev/null +++ b/ARCHITECTURE.md @@ -0,0 +1,190 @@ +# AiVideo Architecture Guide + +## 1. 项目目标与当前定位 + +该项目是一个 AIGC 视频 POC,核心能力是把用户 prompt 转成三分镜短视频,并通过 Node API 流式返回进度与结果。当前实现已经覆盖: + +- 分镜生成(`script`) +- 单分镜润色(`refine`) +- 视频渲染与合成(`render`) +- `task_id` 级别隔离输出(`outputs/{task_id}/`) +- Docker 内置 ComfyUI + Node + Python 联动 +- 启动时自检(Comfy 可达性 + workflow/节点约束) + +整体设计是「Node 作为编排/网关,Python 作为生成引擎」。 + +## 2. 目录与职责 + +- `server/`:Node API + SSE 网关 + 启动自检入口 +- `engine/`:Python 生成引擎(LLM 分镜、TTS、Comfy、MoviePy 合成) +- `scripts/`:Comfy 连通性和 workflow 约束检查 +- `configs/config.yaml`:运行时配置(Comfy 地址、模型、workflow 映射等) +- `docker-compose.yml`:`aivideo` + `comfyui` 双服务部署 +- `dev.sh`:本地开发启动/日志/重建封装 +- `outputs/{task_id}/`:任务级产物目录(分镜、润色结果、最终视频) + +## 3. 运行架构(容器级) + +- `aivideo` 服务 + - 运行 Node (`server/index.js`) + - Node 通过 `spawn` 调用 Python(`python -m engine.main`) + - 对外暴露 `3000` +- `comfyui` 服务 + - 默认镜像:`jamesbrink/comfyui:latest` + - 对外暴露 `8188` + - 挂载 `./ComfyUI/*` 到容器 `/comfyui/*` +- 服务连接 + - `aivideo` 通过 `http://comfyui:8188` 访问 ComfyUI API(容器内 DNS) + +## 4. 应用架构(进程级) + +### 4.1 Node 层(`server/index.js`) + +职责: + +- 提供 HTTP/SSE 接口 +- 统一生成 `task_id` 并创建输出目录 +- 把请求参数透传给 Python 引擎 +- 把 Python stdout 协议行转成 SSE 事件 +- 启动前执行自检(`check_comfy.py` + `inspect_comfy_node.py`) + +主要接口: + +- `GET /api/health` +- `GET /api/script`(SSE) +- `POST /api/refine`(JSON) +- `POST /api/render`(SSE) +- `GET /api/static/...`(输出视频静态托管,禁缓存) + +并发策略(当前): + +- 渲染接口使用单全局锁 `isBusy`(同一时刻只允许一个渲染) + +### 4.2 Python 引擎层(`engine/main.py`) + +职责: + +- 解析参数并分发 `step`:`script/refine/render` +- 处理全局风格与角色注入 +- 与 OpenAI、ComfyUI、TTS、MoviePy 协同 +- 按协议输出进度与结构化结果(`SCENE_JSON`、`PROG`、`RENDER_DONE`) + +子模块职责: + +- `engine/script_gen.py`:LLM 分镜生成与润色 +- `engine/audio_gen.py`:Edge TTS 合成旁白 +- `engine/comfy_client.py`:提交 workflow、轮询 history、提取产物 +- `engine/video_editor.py`:字幕叠加 + 转场 + 最终拼接 +- `engine/config.py`:YAML 配置读取 + +## 5. 核心流程 + +### 5.1 Script 生成 + +1. Node 收到 `GET /api/script` +2. 生成 `task_id`,创建 `outputs/{task_id}` +3. Node spawn Python `--step script` +4. Python 调 LLM 生成三分镜(无 key 时可 mock fallback) +5. Python 输出多行 `SCENE_JSON ...` +6. Node 将其转发为 SSE `scene` 事件 + +### 5.2 Refine 润色 + +1. Node 收到 `POST /api/refine` +2. 透传当前 scenes/scene 到 Python stdin +3. Python 调 LLM 润色指定分镜 +4. 返回 `SCENE_JSON`,Node 组装 JSON 响应 + +### 5.3 Render 渲染 + +1. Node 收到 `POST /api/render`(SSE) +2. 全局 `isBusy` 判定是否可渲染 +3. Python 先做 TTS(并发),再逐分镜调 Comfy +4. 收集视频 + 音频,MoviePy 合成 `final.mp4` +5. Python 输出 `PROG` 进度与 `RENDER_DONE` +6. Node 转发 SSE 完成事件 + +## 6. 关键设计约束 + +- `task_id` 必须贯穿 API 与引擎,保证产物隔离 +- 启动自检失败时,服务不启动(fail fast) +- workflow 参数注入基于: + - 明确 node_id,或 + - class_type fallback 自动定位 +- 全局风格/角色必须双重注入: + - LLM prompt 约束 + - 渲染前 image_prompt 装饰(character + style + scene) + +## 7. 当前架构优势 + +- **职责拆分清晰**:Node 编排、Python 算法,边界明确 +- **可观测性较好**:SSE 实时进度 + 结构化协议行 +- **生产思路正确**:自检机制避免“半可用”状态 +- **兼容能力强**:mock 路径可脱离 Comfy/LLM 快速调通 + +## 8. 主要架构风险与优化方向 + +### P0(优先处理) + +1. **作业状态只在内存** + - 问题:Node 重启后任务状态丢失,前端不可恢复 + - 建议:引入任务元数据存储(SQLite/Redis),记录状态机(queued/running/succeeded/failed) + +2. **单点渲染锁 `isBusy`** + - 问题:无法扩展并发;请求高峰体验差 + - 建议:升级为队列模型(本地队列或 Redis/BullMQ),支持排队和取消 + +3. **SSE 协议基于字符串前缀** + - 问题:协议演进脆弱,难版本化 + - 建议:统一 JSON line 协议(字段:`type`, `task_id`, `ts`, `payload`, `version`) + +### P1(中期) + +4. **配置与环境耦合较松散** + - 建议:增加 config schema 校验(pydantic/JSON schema),启动即检查缺项与类型 + +5. **Comfy 产物识别依赖 history + 文件存在** + - 建议:扩展更稳定的完成判定(WebSocket event 或更严格 history 状态判断) + +6. **缺少全链路 trace id** + - 建议:在 Node/Python/Comfy 请求中统一注入 `task_id` 和 `request_id` + +### P2(长期) + +7. **引擎内聚度可再提升** + - 建议:把 `script/refine/render` 拆成独立 use-case 模块,CLI 仅作参数适配 + +8. **测试体系不足** + - 建议: + - 单元测试:config、workflow 注入、scene 解析 + - 集成测试:mock 渲染链路 + - 冒烟测试:Docker 启动 + `/api/health` + +## 9. 推荐重构路线(4 周) + +- 第 1 周:任务状态持久化 + API 状态查询接口(`/api/tasks/:id`) +- 第 2 周:渲染队列化(先单 worker),替换 `isBusy` +- 第 3 周:统一事件协议(JSON line + version),前后端同时改 +- 第 4 周:补测试与可观测(结构化日志、错误码、性能指标) + +## 10. 建议新增接口(便于运维和前端) + +- `GET /api/tasks/:task_id`:任务状态与阶段信息 +- `POST /api/tasks/:task_id/cancel`:取消任务 +- `GET /api/tasks/:task_id/artifacts`:列出产物路径和类型 +- `GET /api/system/checks`:最近一次自检结果 + +## 11. 性能优化清单(先易后难) + +- TTS 结果按文本 hash 缓存,避免重复合成 +- 分镜视频生成支持失败重试与断点继续 +- MoviePy 合成参数按场景切换(开发 `veryfast`,生产 `medium/slow`) +- 对 `outputs/` 增加清理策略(TTL + 最大磁盘占用阈值) + +## 12. 你下一步可以直接做的事 + +1. 先落地任务持久化和状态查询(收益最大、侵入最小) +2. 再把 `isBusy` 改为队列(并保留单 worker) +3. 最后统一事件协议,减少前后端耦合与兼容风险 + +这份文档的目的不是重写现有实现,而是在保留当前可用链路的前提下,把系统逐步推进到可扩展的生产形态。 diff --git a/engine/assembler.py b/engine/assembler.py new file mode 100644 index 0000000..73afd28 --- /dev/null +++ b/engine/assembler.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +from pathlib import Path + +from moviepy import VideoFileClip, concatenate_videoclips + + +def assemble_clips(clips: list[str | Path], output_path: str | Path) -> Path: + out = Path(output_path) + out.parent.mkdir(parents=True, exist_ok=True) + if not clips: + raise ValueError("clips must not be empty") + + vclips: list[VideoFileClip] = [] + for c in clips: + vclips.append(VideoFileClip(str(c))) + + final = concatenate_videoclips(vclips, method="compose") + try: + fps = vclips[0].fps if vclips and vclips[0].fps else 24 + final.write_videofile(str(out), codec="libx264", audio_codec="aac", fps=fps, preset="medium", threads=4) + finally: + final.close() + for c in vclips: + c.close() + return out + diff --git a/engine/director.py b/engine/director.py new file mode 100644 index 0000000..f6afe9a --- /dev/null +++ b/engine/director.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from typing import Any + + +def _read_scene(scene: Any) -> tuple[str, str, str]: + if hasattr(scene, "image_prompt") and hasattr(scene, "video_motion") and hasattr(scene, "narration"): + return ( + str(getattr(scene, "image_prompt", "")).strip(), + str(getattr(scene, "video_motion", "")).strip(), + str(getattr(scene, "narration", "")).strip(), + ) + if isinstance(scene, dict): + return ( + str(scene.get("image_prompt", "")).strip(), + str(scene.get("video_motion", scene.get("motion", ""))).strip(), + str(scene.get("narration", scene.get("tts", ""))).strip(), + ) + return ("", "", "") + + +def scenes_to_shots(scenes: list) -> list[dict[str, Any]]: + shots: list[dict[str, Any]] = [] + for scene_idx, scene in enumerate(scenes, start=1): + image_prompt, motion, tts = _read_scene(scene) + scene_id = f"scene_{scene_idx:02d}" + shot_id = f"{scene_id}_01" + # Keep default duration simple and deterministic for MVP. + duration = 3 + shots.append( + { + "shot_id": shot_id, + "scene_id": scene_id, + "duration": int(duration), + "image_prompt": image_prompt, + "motion": motion, + "camera": "", + "tts": tts, + "status": "pending", + } + ) + return shots + diff --git a/engine/main.py b/engine/main.py index 2b05295..2c9bf28 100644 --- a/engine/main.py +++ b/engine/main.py @@ -13,9 +13,13 @@ from moviepy import ImageClip from PIL import Image, ImageDraw, ImageFont from engine.audio_gen import synthesize_scenes +from engine.assembler import assemble_clips from engine.comfy_client import ComfyClient from engine.config import AppConfig +from engine.director import scenes_to_shots +from engine.shot_executor import render_shot from engine.script_gen import generate_scenes, refine_scene +from engine.task_store import create_task, update_shot_status, update_task_status from engine.types import Scene from engine.video_editor import Segment, render_final @@ -65,6 +69,10 @@ def _prog(p: float, msg: str) -> None: _emit("PROG " + json.dumps({"p": p2, "msg": msg}, ensure_ascii=False)) +def _prog_shot(shot_id: str, status: str) -> None: + _emit(f"PROG_SHOT {shot_id} {status}") + + def _normalize_style(style: str | None) -> str: s = (style or "").strip() if not s: @@ -308,13 +316,43 @@ def step_refine( def step_render(prompt: str, cfg: AppConfig, mock: bool, *, style: str | None, character: str | None, out_dir: Path) -> int: payload = _read_stdin_json() - scenes = _parse_scenes_from_obj(payload) + scenes_raw = _parse_scenes_from_obj(payload) + scenes = [ + Scene( + image_prompt=_decorate_image_prompt(s.image_prompt, style=style, character=character), + video_motion=s.video_motion, + narration=s.narration, + ) + for s in scenes_raw + ] + shots = scenes_to_shots(scenes) out_dir.mkdir(parents=True, exist_ok=True) + task_id = out_dir.name + create_task(task_id, shots) + update_task_status(task_id, "running") _prog(0.05, "Start render") - out = asyncio.run(_render_from_scenes(prompt, scenes, cfg, mock=mock, style=style, character=character, out_dir=out_dir)) - _prog(1.0, "Render finished") - _emit("RENDER_DONE " + json.dumps({"output": str(out)}, ensure_ascii=False)) - return 0 + clips: list[str] = [] + total = max(1, len(shots)) + try: + for idx, shot in enumerate(shots, start=1): + shot_id = str(shot.get("shot_id", f"shot_{idx:02d}")) + update_shot_status(task_id, shot_id, "running") + _prog_shot(shot_id, "running") + clip_path = render_shot(shot, out_dir, cfg, mock=mock) + clips.append(clip_path) + update_shot_status(task_id, shot_id, "done") + _prog_shot(shot_id, "done") + _prog(0.05 + 0.8 * idx / total, f"Rendered shot {idx}/{total}") + + final_out = out_dir / "final.mp4" + out = assemble_clips(clips, final_out) + update_task_status(task_id, "done") + _prog(1.0, "Render finished") + _emit("RENDER_DONE " + json.dumps({"output": str(out)}, ensure_ascii=False)) + return 0 + except Exception: + update_task_status(task_id, "failed") + raise def main() -> int: diff --git a/engine/shot_executor.py b/engine/shot_executor.py new file mode 100644 index 0000000..99bc95c --- /dev/null +++ b/engine/shot_executor.py @@ -0,0 +1,113 @@ +from __future__ import annotations + +import asyncio +import random +from pathlib import Path +from typing import Any + +from moviepy import AudioFileClip, CompositeVideoClip, TextClip, VideoFileClip, vfx + +from .audio_gen import synthesize_one +from .comfy_client import ComfyClient +from .config import AppConfig + + +def _fit_video_to_audio(video: VideoFileClip, audio: AudioFileClip) -> VideoFileClip: + if audio.duration is None or video.duration is None: + return video.with_audio(audio) + if audio.duration > video.duration: + video = video.with_effects([vfx.Loop(duration=audio.duration)]) + elif video.duration > audio.duration: + video = video.subclipped(0, audio.duration) + return video.with_audio(audio) + + +def _subtitle_clip(text: str, size: tuple[int, int], duration: float) -> TextClip: + return ( + TextClip( + text=text, + font_size=44, + color="white", + stroke_color="black", + stroke_width=2, + size=(int(size[0] * 0.92), None), + method="caption", + ) + .with_position(("center", "bottom")) + .with_duration(duration) + .with_opacity(0.95) + ) + + +async def _render_shot_async( + shot: dict[str, Any], + output_dir: str | Path, + cfg: AppConfig, + *, + mock: bool = False, +) -> str: + out_dir = Path(output_dir) + out_dir.mkdir(parents=True, exist_ok=True) + clips_dir = out_dir / "clips" + audio_dir = out_dir / "audio" + clips_dir.mkdir(parents=True, exist_ok=True) + audio_dir.mkdir(parents=True, exist_ok=True) + + shot_id = str(shot.get("shot_id", "unknown")) + image_prompt = str(shot.get("image_prompt", "")).strip() + motion = str(shot.get("motion", "")).strip() + tts_text = str(shot.get("tts", "")).strip() + duration_s = max(1.0, float(shot.get("duration", 3))) + + voice = str(cfg.get("tts.voice", "zh-CN-XiaoxiaoNeural")) + rate = str(cfg.get("tts.rate", "+0%")) + volume = str(cfg.get("tts.volume", "+0%")) + audio_path = audio_dir / f"shot_{shot_id}.mp3" + audio_asset = await synthesize_one(tts_text or " ", audio_path, voice, rate, volume) + + if mock: + from engine.main import _ensure_mock_image, _make_mock_video # local import to avoid circular at module import + + mock_size = cfg.get("video.mock_size", [1024, 576]) + w, h = int(mock_size[0]), int(mock_size[1]) + mock_image = _ensure_mock_image(Path("./assets/mock.png"), (w, h)) + fps = int(cfg.get("video.mock_fps", 24)) + raw_video_path = out_dir / f"shot_raw_{shot_id}.mp4" + _make_mock_video(raw_video_path, mock_image, max(duration_s, audio_asset.duration_s), fps=fps) + else: + comfy = ComfyClient(cfg) + wf = comfy.load_workflow() + seed = random.randint(1, 2_147_483_647) + wf_i = comfy.inject_params(wf, image_prompt=image_prompt, seed=seed, motion_prompt=motion or None) + result = await comfy.run_workflow(wf_i) + candidates = [p for p in result.output_files if p.suffix.lower() in {".mp4", ".mov", ".webm"}] + raw_video_path = candidates[0] if candidates else result.output_files[0] + + clip_out = clips_dir / f"shot_{shot_id}.mp4" + v = VideoFileClip(str(raw_video_path)) + a = AudioFileClip(str(audio_asset.path)) + try: + v2 = _fit_video_to_audio(v, a) + w2, h2 = v2.size + subtitle = _subtitle_clip(tts_text, (w2, h2), v2.duration or a.duration or duration_s) + comp = CompositeVideoClip([v2, subtitle]) + try: + comp.write_videofile(str(clip_out), codec="libx264", audio_codec="aac", fps=v2.fps or 24, preset="veryfast") + finally: + comp.close() + finally: + v.close() + a.close() + return str(clip_out) + + +def render_shot( + shot: dict[str, Any], + output_dir: str | Path, + cfg: AppConfig | None = None, + *, + mock: bool = False, +) -> str: + cfg2 = cfg or AppConfig.load("./configs/config.yaml") + return asyncio.run(_render_shot_async(shot, output_dir, cfg2, mock=mock)) + diff --git a/engine/task_store.py b/engine/task_store.py new file mode 100644 index 0000000..57dced0 --- /dev/null +++ b/engine/task_store.py @@ -0,0 +1,68 @@ +from __future__ import annotations + +import json +from pathlib import Path +from typing import Any + + +def _task_path(task_id: str, base_dir: str | Path = "./outputs") -> Path: + return Path(base_dir) / str(task_id) / "task.json" + + +def create_task(task_id: str, shots: list[dict[str, Any]], base_dir: str | Path = "./outputs") -> dict[str, Any]: + p = _task_path(task_id, base_dir=base_dir) + p.parent.mkdir(parents=True, exist_ok=True) + data = { + "task_id": str(task_id), + "status": "queued", + "shots": [ + { + "shot_id": str(s.get("shot_id", "")), + "status": str(s.get("status", "pending") or "pending"), + } + for s in shots + ], + } + p.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") + return data + + +def load_task(task_id: str, base_dir: str | Path = "./outputs") -> dict[str, Any]: + p = _task_path(task_id, base_dir=base_dir) + if not p.exists(): + raise FileNotFoundError(f"task file not found: {p}") + raw = json.loads(p.read_text(encoding="utf-8")) + if not isinstance(raw, dict): + raise ValueError("task.json must be an object") + return raw + + +def _save_task(task_id: str, data: dict[str, Any], base_dir: str | Path = "./outputs") -> None: + p = _task_path(task_id, base_dir=base_dir) + p.parent.mkdir(parents=True, exist_ok=True) + p.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") + + +def update_shot_status(task_id: str, shot_id: str, status: str, base_dir: str | Path = "./outputs") -> dict[str, Any]: + data = load_task(task_id, base_dir=base_dir) + shots = data.get("shots") + if not isinstance(shots, list): + raise ValueError("task.json shots must be an array") + found = False + for s in shots: + if isinstance(s, dict) and str(s.get("shot_id", "")) == str(shot_id): + s["status"] = str(status) + found = True + break + if not found: + shots.append({"shot_id": str(shot_id), "status": str(status)}) + _save_task(task_id, data, base_dir=base_dir) + return data + + +def update_task_status(task_id: str, status: str, base_dir: str | Path = "./outputs") -> dict[str, Any]: + data = load_task(task_id, base_dir=base_dir) + data["status"] = str(status) + _save_task(task_id, data, base_dir=base_dir) + return data + diff --git a/server/index.js b/server/index.js index 3ce4085..6ea8386 100644 --- a/server/index.js +++ b/server/index.js @@ -30,6 +30,20 @@ app.get("/api/health", (_req, res) => { res.status(200).json({ ok: true }); }); +app.get("/api/tasks/:task_id", (req, res) => { + const taskId = String(req.params.task_id || "").trim(); + if (!taskId) return res.status(400).json({ error: "missing task_id" }); + const p = path.join(outputsDir, taskId, "task.json"); + if (!fs.existsSync(p)) return res.status(404).json({ error: "task not found", task_id: taskId }); + try { + const raw = fs.readFileSync(p, "utf8"); + const data = JSON.parse(raw); + return res.json(data); + } catch (e) { + return res.status(500).json({ error: "failed to read task", task_id: taskId, detail: String(e) }); + } +}); + function sseHeaders(res) { res.setHeader("Content-Type", "text/event-stream; charset=utf-8"); res.setHeader("Cache-Control", "no-cache, no-transform"); @@ -237,6 +251,17 @@ app.post("/api/render", (req, res) => { for (const line of parts) { if (!line) continue; if (line.startsWith("PROG ")) sseSend(res, "prog", line.slice("PROG ".length)); + else if (line.startsWith("PROG_SHOT ")) { + const rest = line.slice("PROG_SHOT ".length).trim(); + const firstSpace = rest.indexOf(" "); + if (firstSpace > 0) { + const shotId = rest.slice(0, firstSpace).trim(); + const status = rest.slice(firstSpace + 1).trim(); + sseSend(res, "shot_progress", JSON.stringify({ shot_id: shotId, status })); + } else { + sseSend(res, "line", line); + } + } else if (line.startsWith("RENDER_DONE ")) sseSend(res, "done", line.slice("RENDER_DONE ".length)); else sseSend(res, "line", line); }