Compare commits

...

5 Commits

Author SHA1 Message Date
Daniel
89b21dd314 fix:修复页面样式 2026-04-14 12:05:56 +08:00
Daniel
508c28ce31 fix: 优化架构 2026-03-25 19:35:37 +08:00
Daniel
34786b37c7 fix: 修复bug 2026-03-25 16:59:05 +08:00
Daniel
a2f224d01f feat:优化架构 2026-03-25 13:43:00 +08:00
Daniel
8991f2a2d7 fix: 优化内容 2026-03-25 13:33:48 +08:00
228 changed files with 6301 additions and 300 deletions

190
ARCHITECTURE.md Normal file
View File

@@ -0,0 +1,190 @@
# AiVideo Architecture Guide
## 1. 项目目标与当前定位
该项目是一个 AIGC 视频 POC核心能力是把用户 prompt 转成三分镜短视频,并通过 Node API 流式返回进度与结果。当前实现已经覆盖:
- 分镜生成(`script`
- 单分镜润色(`refine`
- 视频渲染与合成(`render`
- `task_id` 级别隔离输出(`outputs/{task_id}/`
- Docker 内置 ComfyUI + Node + Python 联动
- 启动时自检Comfy 可达性 + workflow/节点约束)
整体设计是「Node 作为编排/网关Python 作为生成引擎」。
## 2. 目录与职责
- `server/`Node API + SSE 网关 + 启动自检入口
- `engine/`Python 生成引擎LLM 分镜、TTS、Comfy、MoviePy 合成)
- `scripts/`Comfy 连通性和 workflow 约束检查
- `configs/config.yaml`运行时配置Comfy 地址、模型、workflow 映射等)
- `docker-compose.yml``aivideo` + `comfyui` 双服务部署
- `dev.sh`:本地开发启动/日志/重建封装
- `outputs/{task_id}/`:任务级产物目录(分镜、润色结果、最终视频)
## 3. 运行架构(容器级)
- `aivideo` 服务
- 运行 Node (`server/index.js`)
- Node 通过 `spawn` 调用 Python`python -m engine.main`
- 对外暴露 `3000`
- `comfyui` 服务
- 默认镜像:`jamesbrink/comfyui:latest`
- 对外暴露 `8188`
- 挂载 `./ComfyUI/*` 到容器 `/comfyui/*`
- 服务连接
- `aivideo` 通过 `http://comfyui:8188` 访问 ComfyUI API容器内 DNS
## 4. 应用架构(进程级)
### 4.1 Node 层(`server/index.js`
职责:
- 提供 HTTP/SSE 接口
- 统一生成 `task_id` 并创建输出目录
- 把请求参数透传给 Python 引擎
- 把 Python stdout 协议行转成 SSE 事件
- 启动前执行自检(`check_comfy.py` + `inspect_comfy_node.py`
主要接口:
- `GET /api/health`
- `GET /api/script`SSE
- `POST /api/refine`JSON
- `POST /api/render`SSE
- `GET /api/static/...`(输出视频静态托管,禁缓存)
并发策略(当前):
- 渲染接口使用单全局锁 `isBusy`(同一时刻只允许一个渲染)
### 4.2 Python 引擎层(`engine/main.py`
职责:
- 解析参数并分发 `step``script/refine/render`
- 处理全局风格与角色注入
- 与 OpenAI、ComfyUI、TTS、MoviePy 协同
- 按协议输出进度与结构化结果(`SCENE_JSON``PROG``RENDER_DONE`
子模块职责:
- `engine/script_gen.py`LLM 分镜生成与润色
- `engine/audio_gen.py`Edge TTS 合成旁白
- `engine/comfy_client.py`:提交 workflow、轮询 history、提取产物
- `engine/video_editor.py`:字幕叠加 + 转场 + 最终拼接
- `engine/config.py`YAML 配置读取
## 5. 核心流程
### 5.1 Script 生成
1. Node 收到 `GET /api/script`
2. 生成 `task_id`,创建 `outputs/{task_id}`
3. Node spawn Python `--step script`
4. Python 调 LLM 生成三分镜(无 key 时可 mock fallback
5. Python 输出多行 `SCENE_JSON ...`
6. Node 将其转发为 SSE `scene` 事件
### 5.2 Refine 润色
1. Node 收到 `POST /api/refine`
2. 透传当前 scenes/scene 到 Python stdin
3. Python 调 LLM 润色指定分镜
4. 返回 `SCENE_JSON`Node 组装 JSON 响应
### 5.3 Render 渲染
1. Node 收到 `POST /api/render`SSE
2. 全局 `isBusy` 判定是否可渲染
3. Python 先做 TTS并发再逐分镜调 Comfy
4. 收集视频 + 音频MoviePy 合成 `final.mp4`
5. Python 输出 `PROG` 进度与 `RENDER_DONE`
6. Node 转发 SSE 完成事件
## 6. 关键设计约束
- `task_id` 必须贯穿 API 与引擎,保证产物隔离
- 启动自检失败时服务不启动fail fast
- workflow 参数注入基于:
- 明确 node_id
- class_type fallback 自动定位
- 全局风格/角色必须双重注入:
- LLM prompt 约束
- 渲染前 image_prompt 装饰character + style + scene
## 7. 当前架构优势
- **职责拆分清晰**Node 编排、Python 算法,边界明确
- **可观测性较好**SSE 实时进度 + 结构化协议行
- **生产思路正确**:自检机制避免“半可用”状态
- **兼容能力强**mock 路径可脱离 Comfy/LLM 快速调通
## 8. 主要架构风险与优化方向
### P0优先处理
1. **作业状态只在内存**
- 问题Node 重启后任务状态丢失,前端不可恢复
- 建议引入任务元数据存储SQLite/Redis记录状态机queued/running/succeeded/failed
2. **单点渲染锁 `isBusy`**
- 问题:无法扩展并发;请求高峰体验差
- 建议:升级为队列模型(本地队列或 Redis/BullMQ支持排队和取消
3. **SSE 协议基于字符串前缀**
- 问题:协议演进脆弱,难版本化
- 建议:统一 JSON line 协议(字段:`type`, `task_id`, `ts`, `payload`, `version`
### P1中期
4. **配置与环境耦合较松散**
- 建议:增加 config schema 校验pydantic/JSON schema启动即检查缺项与类型
5. **Comfy 产物识别依赖 history + 文件存在**
- 建议扩展更稳定的完成判定WebSocket event 或更严格 history 状态判断)
6. **缺少全链路 trace id**
- 建议:在 Node/Python/Comfy 请求中统一注入 `task_id``request_id`
### P2长期
7. **引擎内聚度可再提升**
- 建议:把 `script/refine/render` 拆成独立 use-case 模块CLI 仅作参数适配
8. **测试体系不足**
- 建议:
- 单元测试config、workflow 注入、scene 解析
- 集成测试mock 渲染链路
- 冒烟测试Docker 启动 + `/api/health`
## 9. 推荐重构路线4 周)
- 第 1 周:任务状态持久化 + API 状态查询接口(`/api/tasks/:id`
- 第 2 周:渲染队列化(先单 worker替换 `isBusy`
- 第 3 周统一事件协议JSON line + version前后端同时改
- 第 4 周:补测试与可观测(结构化日志、错误码、性能指标)
## 10. 建议新增接口(便于运维和前端)
- `GET /api/tasks/:task_id`:任务状态与阶段信息
- `POST /api/tasks/:task_id/cancel`:取消任务
- `GET /api/tasks/:task_id/artifacts`:列出产物路径和类型
- `GET /api/system/checks`:最近一次自检结果
## 11. 性能优化清单(先易后难)
- TTS 结果按文本 hash 缓存,避免重复合成
- 分镜视频生成支持失败重试与断点继续
- MoviePy 合成参数按场景切换(开发 `veryfast`,生产 `medium/slow`
-`outputs/` 增加清理策略TTL + 最大磁盘占用阈值)
## 12. 你下一步可以直接做的事
1. 先落地任务持久化和状态查询(收益最大、侵入最小)
2. 再把 `isBusy` 改为队列(并保留单 worker
3. 最后统一事件协议,减少前后端耦合与兼容风险
这份文档的目的不是重写现有实现,而是在保留当前可用链路的前提下,把系统逐步推进到可扩展的生产形态。

View File

@@ -1,6 +1,12 @@
FROM python:3.10-slim
ARG NODE_BASE_IMAGE=docker.m.daocloud.io/library/node:20-bookworm-slim
ARG CUDA_BASE_IMAGE=docker.m.daocloud.io/nvidia/cuda:12.1.1-runtime-ubuntu22.04
ENV PYTHONDONTWRITEBYTECODE=1 \
FROM ${NODE_BASE_IMAGE} AS node20
FROM ${CUDA_BASE_IMAGE} AS builder
ENV DEBIAN_FRONTEND=noninteractive \
PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PIP_DISABLE_PIP_VERSION_CHECK=1 \
PIP_NO_CACHE_DIR=1 \
@@ -10,26 +16,53 @@ ENV PYTHONDONTWRITEBYTECODE=1 \
WORKDIR /app
# ffmpeg is required for MoviePy (audio duration + encoding).
RUN if [ -f /etc/apt/sources.list ]; then \
sed -i 's|http://deb.debian.org/debian|https://mirrors.tuna.tsinghua.edu.cn/debian|g; s|http://security.debian.org/debian-security|https://mirrors.tuna.tsinghua.edu.cn/debian-security|g' /etc/apt/sources.list; \
fi \
&& if [ -f /etc/apt/sources.list.d/debian.sources ]; then \
sed -i 's|http://deb.debian.org/debian|https://mirrors.tuna.tsinghua.edu.cn/debian|g; s|http://security.debian.org/debian-security|https://mirrors.tuna.tsinghua.edu.cn/debian-security|g' /etc/apt/sources.list.d/debian.sources; \
fi \
&& apt-get update && apt-get install -y --no-install-recommends \
ffmpeg \
fonts-dejavu-core \
nodejs \
npm \
&& rm -rf /var/lib/apt/lists/*
# Base deps + Python 3.10
RUN sed -i 's|http://archive.ubuntu.com/ubuntu|https://mirrors.tuna.tsinghua.edu.cn/ubuntu|g; s|http://security.ubuntu.com/ubuntu|https://mirrors.tuna.tsinghua.edu.cn/ubuntu|g' /etc/apt/sources.list \
&& apt-get -o Acquire::Retries=5 update \
&& apt-get -o Acquire::Retries=5 install -y --no-install-recommends --fix-missing \
ca-certificates curl gnupg \
python3.10 python3.10-distutils python3-pip \
ffmpeg fonts-dejavu-core \
&& ln -sf /usr/bin/python3.10 /usr/local/bin/python \
&& rm -rf /var/lib/apt/lists/*
COPY --from=node20 /usr/local /usr/local
COPY requirements.txt /app/requirements.txt
RUN pip install -r /app/requirements.txt
RUN python3.10 -m pip install -r /app/requirements.txt
COPY server/package.json server/package-lock.json /app/server/
RUN cd /app/server && npm ci --omit=dev
COPY . /app
RUN cd /app/server && npm i --omit=dev
FROM ${CUDA_BASE_IMAGE} AS runtime
ENV DEBIAN_FRONTEND=noninteractive \
PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PIP_DISABLE_PIP_VERSION_CHECK=1 \
PIP_NO_CACHE_DIR=1 \
PIP_INDEX_URL=https://pypi.tuna.tsinghua.edu.cn/simple \
PIP_TRUSTED_HOST=pypi.tuna.tsinghua.edu.cn \
NPM_CONFIG_REGISTRY=https://registry.npmmirror.com
WORKDIR /app
RUN sed -i 's|http://archive.ubuntu.com/ubuntu|https://mirrors.tuna.tsinghua.edu.cn/ubuntu|g; s|http://security.ubuntu.com/ubuntu|https://mirrors.tuna.tsinghua.edu.cn/ubuntu|g' /etc/apt/sources.list \
&& apt-get -o Acquire::Retries=5 update \
&& apt-get -o Acquire::Retries=5 install -y --no-install-recommends --fix-missing \
ca-certificates \
python3.10 python3.10-distutils python3-pip \
ffmpeg fonts-dejavu-core \
&& ln -sf /usr/bin/python3.10 /usr/local/bin/python \
&& rm -rf /var/lib/apt/lists/*
COPY --from=node20 /usr/local /usr/local
COPY --from=builder /usr/local/lib/python3.10 /usr/local/lib/python3.10
COPY --from=builder /usr/local/bin /usr/local/bin
COPY --from=builder /app /app
EXPOSE 3000
CMD ["node", "/app/server/index.js"]

View File

@@ -5,6 +5,8 @@
- Output: a 3-scene narrated video `final_poc.mp4` (mock mode supported)
## Quick start (Docker)
`docker compose up` includes a **ComfyUI** service (default through domestic mirror: `docker.1ms.run/ardenius/comfyui-cpu:latest`). If you use another registry image, set `COMFYUI_IMAGE` in the environment.
Build:
```bash
@@ -47,3 +49,4 @@ Open `http://127.0.0.1:3000` and click “运行” to see `main.py --script-onl
- **apt**: TUNA Debian mirrors (baked into `Dockerfile`)
- **pip**: `PIP_INDEX_URL=https://pypi.tuna.tsinghua.edu.cn/simple`
- **npm**: `NPM_CONFIG_REGISTRY=https://registry.npmmirror.com`
- **docker images**: default base images now pull via `docker.m.daocloud.io` mirror

BIN
assets/demo.jpg Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 150 KiB

52
clone.html Normal file

File diff suppressed because one or more lines are too long

View File

@@ -1,16 +1,48 @@
app:
# ComfyUI base url (local)
comfy_base_url: "http://127.0.0.1:8188"
# ComfyUI base url (docker internal service)
comfy_base_url: "http://comfyui:8188"
# ComfyUI output directory on the same machine running this code
comfy_output_dir: "./ComfyUI/output"
global:
# Used by prompt_injector + adapters.
style: ""
character: ""
negative_prompt: ""
llm:
# Controls /script + /refine generation.
provider: "mock" # "openai" to enable OpenAI/DashScope calls
image:
provider: "mock" # "mock" | "comfy" | "replicate" | "openai"
# Generic model name (used by some providers as fallback).
model: ""
replicate:
# Example: "stability-ai/sdxl"
model: "stability-ai/sdxl"
openai:
# Example: "gpt-image-1"
model: "gpt-image-1"
image_fallback:
provider: "mock"
video:
provider: "moviepy"
tts:
provider: "edge"
openai:
# Prefer environment variables in real deployments.
# OPENAI_API_KEY must be set; OPENAI_BASE_URL optional (for DeepSeek / other gateways).
api_key_env: "OPENAI_API_KEY"
base_url_env: "OPENAI_BASE_URL"
api_key_env: "sk-85880595fc714d63bfd0b025e917bd26"
base_url_env: "https://dashscope.aliyuncs.com/compatible-mode/v1"
# Example: "gpt-4o-mini" / "gpt-4o" / gateway specific names
model: "gpt-4o-mini"
model: "qwen3.5-plus"
script_gen:
# Narration length constraint per scene (Chinese chars approx)
@@ -26,7 +58,7 @@ tts:
video:
# Final output path
final_output: "./final_poc.mp4"
final_output: "./outputs/final_poc.mp4"
# If ComfyUI is not ready, generate mock clips with this size & fps
mock_size: [1024, 576]
mock_fps: 24

26
dev.sh
View File

@@ -18,7 +18,31 @@ shift || true
case "$CMD" in
up)
docker compose up --build "$@"
# Start in background, then wait for Node self-check + health endpoint.
docker compose up -d --build "$@"
echo "[dev] waiting for server health..."
# ComfyUI first startup may take longer while preparing custom nodes.
HEALTH_TIMEOUT_SECONDS="${HEALTH_TIMEOUT_SECONDS:-300}"
deadline=$((SECONDS + HEALTH_TIMEOUT_SECONDS))
ok=0
while [ $SECONDS -lt $deadline ]; do
if curl -fsS "http://127.0.0.1:3000/api/health" >/dev/null 2>&1; then
ok=1
break
fi
# Fail fast only if container actually exited (avoid mis-detecting "starting" state).
if docker compose ps --status exited | grep -q "aivideo"; then
break
fi
sleep 1
done
if [ "$ok" -ne 1 ]; then
echo "[dev] server failed to become healthy (self-check likely failed)." >&2
docker compose logs --tail=200 aivideo || true
exit 1
fi
echo "[dev] server ready: http://127.0.0.1:3000"
docker compose logs -f --tail=50 aivideo
;;
rebuild)
docker compose build "$@"

View File

@@ -2,13 +2,27 @@ services:
aivideo:
build: .
working_dir: /app
depends_on:
- comfyui
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- OPENAI_BASE_URL=${OPENAI_BASE_URL}
- OPENAI_API_KEY=${OPENAI_API_KEY:-}
- OPENAI_BASE_URL=${OPENAI_BASE_URL:-}
- PORT=3000
volumes:
- ./:/app
# Keep dependencies inside container volume to avoid host FS read issues on macOS.
- /app/server/node_modules
ports:
- "3000:3000"
# On macOS, use host.docker.internal to reach host services like ComfyUI.
# Example: set app.comfy_base_url in configs/config.yaml to http://host.docker.internal:8188
# Default: use domestic mirror to speed up pulls in CN networks.
# Override with COMFYUI_IMAGE to use another registry/image.
comfyui:
# CPU-friendly default image for non-NVIDIA development machines.
# Override with COMFYUI_IMAGE to switch back to a GPU image.
image: ${COMFYUI_IMAGE:-docker.1ms.run/ardenius/comfyui-cpu:latest}
# Force bind to all interfaces so other containers (and `check_comfy`) can reach it.
# Works with the default ardenius/comfyui-cpu image layout (/ComfyUI-cpu/main.py).
command: ${COMFYUI_COMMAND:-python3 /ComfyUI-cpu/main.py --cpu --cpu-vae --listen 0.0.0.0 --port 8188}
ports:
- "8188:8188"

View File

@@ -0,0 +1,61 @@
# AiVideo Studio 重构方案V2
## 目标
- 实现与参考编辑器页面一致的工作台布局:顶部导航、中央预览舞台、底部时间线、右侧参数与日志面板。
- 保留并强化现有能力:自动分镜、单镜头润色、渲染、阶段日志、任务跟踪。
- 将前端代码从“流程散落”改造为“分层架构”,减少重复逻辑与状态不一致风险。
## 新架构
### 1. UI Shell 层
- `TopNav`品牌、快捷操作、Play/Render 入口。
- `ToolBar`上下文状态task/mock与工具位。
- `PreviewStage`:展示当前 Scene 预览或 final video。
- `TimelineStrip`:镜头缩略图时间线与进度条。
- `SceneEditor`:单镜头编辑与润色入口。
### 2. 状态层StudioState
- 统一使用 `useReducer` 管理工作台状态。
- 核心状态:
- `scenes`
- `selectedSceneIndex`
- `stageState`
- `stageLogs`
- `renderProgress`
- `taskId`
- `finalVideoUrl`
- `toast`
- 所有修改均通过 `dispatch(action)` 进入 reducer避免多处 `useState` 导致的竞态和重复。
### 3. API/流处理层StudioAPI
- `startScriptStream()`EventSource 流式接收 Script 阶段事件。
- `postStream()`:统一处理 Refine/Render 的 fetch + SSE 流。
- `consumeFetchSSE()`:抽象 SSE 分块解析,消除重复解析代码。
### 4. 事件编排层
- `onStageEvent(sourceStage, event, data)` 统一路由:
- `task`
- `stage_update`
- `line`
- `error`
- `done`
- `applyStageUpdate()` 负责阶段状态、进度、scene upsert 的单点更新。
## 数据流
1. 用户修改 Prompt / Provider 参数。
2. 自动防抖触发 `startScript()`,进入 Script SSE。
3. `stage_update.scene_json` 持续更新 `scenes`,时间线和右侧编辑器同步刷新。
4. 用户可对当前 Scene 进行编辑与 `refine`
5. 用户触发 `render`,进度与日志实时反馈,完成后挂载 final video URL。
## 兼容性说明
- 继续使用 `babel-standalone@6`,所以 JSX 不使用 `<>...</>`,统一为 `React.Fragment`
- 保持现有后端接口契约不变:
- `GET /api/script`
- `POST /api/refine`
- `POST /api/render`
## 后续建议
- 将当前单文件页面拆分为真正多文件前端工程Vite + React引入 TypeScript 与单元测试。
- 增加 Timeline 的拖拽排序与片段时长编辑。
- 增加右侧“资产库”与“模板库”,进一步贴近参考产品完整体验。

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,9 @@
from __future__ import annotations
from pathlib import Path
class BaseImageGen:
def generate(self, prompt: dict[str, str], output_dir: str | Path) -> str:
raise NotImplementedError

View File

@@ -0,0 +1,36 @@
from __future__ import annotations
from pathlib import Path
from typing import Any
from engine.comfy_client import generate_image as comfy_generate_image
from engine.config import AppConfig
from .base import BaseImageGen
from .mock_adapter import MockImageGen
class ComfyAdapter(BaseImageGen):
def __init__(self, cfg: AppConfig):
self.cfg = cfg
self.fallback = MockImageGen()
def generate(self, prompt: dict[str, str], output_dir: str | Path) -> str:
positive = str(prompt.get("positive", "") or "")
negative = str(prompt.get("negative", "") or "")
try:
return str(
comfy_generate_image(
positive,
output_dir,
negative_text=negative or None,
cfg=self.cfg,
timeout_s=60,
retry=2,
filename_prefix="shot",
)
)
except Exception as e:
# Let render_pipeline do configured fallback.
raise

View File

@@ -0,0 +1,45 @@
from __future__ import annotations
import os
import uuid
from pathlib import Path
from urllib.request import urlopen
from PIL import Image
from .base import BaseImageGen
ASSETS_DIR = "assets"
DEMO_IMAGE = os.path.join(ASSETS_DIR, "demo.jpg")
def ensure_demo_image() -> None:
os.makedirs(ASSETS_DIR, exist_ok=True)
if os.path.exists(DEMO_IMAGE):
return
url = "https://picsum.photos/1280/720"
with urlopen(url, timeout=30) as resp:
data = resp.read()
with open(DEMO_IMAGE, "wb") as f:
f.write(data)
class MockImageGen(BaseImageGen):
def generate(self, prompt: dict[str, str], output_dir: str | Path) -> str:
# prompt is accepted for interface consistency; mock uses only demo.jpg.
_ = prompt
ensure_demo_image()
out_dir = Path(output_dir)
out_dir.mkdir(parents=True, exist_ok=True)
out_path = out_dir / f"shot_{uuid.uuid4().hex}.png"
try:
# Convert to PNG so verification criteria can match *.png.
img = Image.open(DEMO_IMAGE).convert("RGB")
img.save(str(out_path), format="PNG")
except Exception:
# Last-resort: if PNG conversion fails, still write a best-effort copy.
out_path.write_bytes(Path(DEMO_IMAGE).read_bytes())
return str(out_path)

View File

@@ -0,0 +1,83 @@
from __future__ import annotations
import os
import uuid
from io import BytesIO
from pathlib import Path
from typing import Any
import requests
from PIL import Image
from engine.config import AppConfig
from .base import BaseImageGen
class OpenAIImageAdapter(BaseImageGen):
"""
Optional image provider adapter using OpenAI Images API (or OpenAI-compatible gateways).
Requires `openai` python package and a configured API key via environment variables.
"""
def __init__(self, cfg: AppConfig):
self.cfg = cfg
# Expected keys (configurable):
# - image.openai.model
# - openai.api_key_env / openai.base_url_env (reuses existing engine/script_gen config fields)
self.model = str(cfg.get("image.openai.model", cfg.get("image.model", ""))).strip()
if not self.model:
raise ValueError("OpenAIImageAdapter requires `image.openai.model` (or `image.model`).")
api_key_env_or_literal = str(cfg.get("openai.api_key_env", "OPENAI_API_KEY") or "OPENAI_API_KEY").strip()
# Support both:
# - env var name (e.g. OPENAI_API_KEY)
# - literal API key (e.g. starts with `sk-...`) for quick local POCs.
if api_key_env_or_literal.startswith("sk-"):
api_key = api_key_env_or_literal
else:
api_key = os.environ.get(api_key_env_or_literal)
if not api_key:
raise RuntimeError(f"OpenAIImageAdapter missing API key: `{api_key_env_or_literal}`")
self.api_key = api_key
base_url_env_or_literal = str(cfg.get("openai.base_url_env", "https://api.openai.com/v1")).strip()
self.base_url = base_url_env_or_literal.rstrip("/") if base_url_env_or_literal else "https://api.openai.com/v1"
# Lazy import to avoid hard dependency for mock/comfy users.
from openai import OpenAI # type: ignore
self.client = OpenAI(api_key=self.api_key, base_url=self.base_url)
def generate(self, prompt: dict[str, str], output_dir: str | Path) -> str:
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
positive = prompt.get("positive", "")
negative = prompt.get("negative", "")
# OpenAI Images API generally doesn't expose a dedicated negative_prompt field.
# To keep interface consistency, embed negative hints into the prompt text.
if negative:
prompt_text = f"{positive}\nNegative prompt: {negative}"
else:
prompt_text = positive
result = self.client.images.generate(model=self.model, prompt=prompt_text)
# OpenAI SDK: result.data[0].url
url: str | None = None
try:
url = result.data[0].url # type: ignore[attr-defined]
except Exception:
pass
if not url:
raise RuntimeError("OpenAIImageAdapter unexpected response: missing image url")
r = requests.get(url, timeout=60)
r.raise_for_status()
out_path = output_dir / f"shot_{uuid.uuid4().hex}.png"
img = Image.open(BytesIO(r.content)).convert("RGB")
img.save(str(out_path), format="PNG")
return str(out_path)

View File

@@ -0,0 +1,60 @@
from __future__ import annotations
import uuid
from pathlib import Path
from typing import Any
import requests
from PIL import Image
from engine.config import AppConfig
from .base import BaseImageGen
class ReplicateAdapter(BaseImageGen):
def __init__(self, cfg: AppConfig):
self.cfg = cfg
# Expected: image.replicate.model
self.model = str(cfg.get("image.replicate.model", cfg.get("image.model", ""))).strip()
if not self.model:
raise ValueError("ReplicateAdapter requires `image.replicate.model` (or `image.model`).")
# Import lazily so that environments without replicate installed can still run with mock/comfy.
import replicate # type: ignore
self.replicate = replicate
def generate(self, prompt: dict[str, str], output_dir: str | Path) -> str:
output_dir = Path(output_dir)
output_dir.mkdir(parents=True, exist_ok=True)
input_payload: dict[str, Any] = {
"prompt": prompt.get("positive", ""),
"negative_prompt": prompt.get("negative", ""),
}
# replicate.run is synchronous when wait is handled by the SDK version.
output = self.replicate.run(self.model, input=input_payload)
# Common shapes: [url, ...] or dict-like.
image_url = None
if isinstance(output, list) and output:
image_url = output[0]
elif isinstance(output, dict):
image_url = output.get("image") or output.get("output") or output.get("url")
if not isinstance(image_url, str) or not image_url:
raise RuntimeError(f"Unexpected Replicate output shape: {type(output)}")
r = requests.get(image_url, timeout=60)
r.raise_for_status()
# Always output PNG to satisfy downstream validation `outputs/{task_id}/*.png`.
out_path = output_dir / f"shot_{uuid.uuid4().hex}.png"
# Pillow doesn't provide open_bytes; wrap content into a buffer.
from io import BytesIO
img = Image.open(BytesIO(r.content)).convert("RGB")
img.save(str(out_path), format="PNG")
return str(out_path)

View File

@@ -0,0 +1,21 @@
from __future__ import annotations
from pathlib import Path
from engine.config import AppConfig
from .base import BaseImageGen
class StabilityAdapter(BaseImageGen):
"""
Placeholder for Stability AI image generation.
Add implementation + dependencies when needed.
"""
def __init__(self, cfg: AppConfig):
self.cfg = cfg
def generate(self, prompt: dict[str, str], output_dir: str | Path) -> str:
raise NotImplementedError("StabilityAdapter not implemented yet")

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,12 @@
from __future__ import annotations
from typing import Any
class BaseLLM:
def generate_script(self, prompt: str, context: dict[str, Any] | None = None) -> Any:
raise NotImplementedError
def refine_scene(self, scene: Any, context: dict[str, Any] | None = None) -> Any:
raise NotImplementedError

View File

@@ -0,0 +1,25 @@
from __future__ import annotations
from typing import Any
from engine.types import Scene
from .base import BaseLLM
class MockLLM(BaseLLM):
def generate_script(self, prompt: str, context: dict[str, Any] | None = None) -> list[Scene]:
# Simple deterministic scenes for offline development.
prompt = (prompt or "").strip()
if not prompt:
prompt = "a warm city night"
return [
Scene(image_prompt=f"{prompt},城市夜景,霓虹灯,电影感", video_motion="缓慢推进镜头,轻微摇镜", narration="夜色温柔落在街灯上"),
Scene(image_prompt=f"{prompt},咖啡店窗边,暖光,细雨", video_motion="侧向平移,人物轻轻抬头", narration="雨声里藏着一段回忆"),
Scene(image_prompt=f"{prompt},桥上远景,车流光轨,温暖", video_motion="拉远全景,光轨流动", narration="我们在光里学会告别"),
]
def refine_scene(self, scene: Scene, context: dict[str, Any] | None = None) -> Scene:
# Minimal polish: append a hint.
return Scene(image_prompt=scene.image_prompt, video_motion=scene.video_motion, narration=(scene.narration + "(更凝练)")[:30])

View File

@@ -0,0 +1,29 @@
from __future__ import annotations
from typing import Any
from engine.config import AppConfig
from engine.script_gen import generate_scenes, refine_scene
from .base import BaseLLM
class OpenAIAdapter(BaseLLM):
def __init__(self, cfg: AppConfig):
self.cfg = cfg
def generate_script(self, prompt: str, context: dict[str, Any] | None = None):
# Existing script_gen already enforces JSON schema and length constraints.
return generate_scenes(prompt, self.cfg)
def refine_scene(self, scene: Any, context: dict[str, Any] | None = None):
if context is None:
context = {}
# Context carries needed values to call refine_scene in script_gen.
scenes = context.get("scenes")
prompt2 = context.get("prompt")
target_index = context.get("target_index")
if scenes is None or prompt2 is None or target_index is None:
raise ValueError("OpenAIAdapter.refine_scene missing context: scenes/prompt/target_index")
return refine_scene(prompt=prompt2, scenes=scenes, target_index=int(target_index), cfg=self.cfg)

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,9 @@
from __future__ import annotations
from pathlib import Path
class BaseTTS:
def generate(self, text: str, output_path: str | Path) -> str:
raise NotImplementedError

View File

@@ -0,0 +1,28 @@
from __future__ import annotations
import asyncio
from pathlib import Path
from engine.audio_gen import synthesize_one
from engine.config import AppConfig
from .base import BaseTTS
class EdgeTTS(BaseTTS):
def __init__(self, cfg: AppConfig):
self.cfg = cfg
def generate(self, text: str, output_path: str | Path) -> str:
text = text or " "
output_path = Path(output_path)
voice = str(self.cfg.get("tts.voice", "zh-CN-XiaoxiaoNeural"))
rate = str(self.cfg.get("tts.rate", "+0%"))
volume = str(self.cfg.get("tts.volume", "+0%"))
async def _run():
asset = await synthesize_one(text, output_path, voice, rate, volume)
return str(asset.path)
return asyncio.run(_run())

View File

@@ -0,0 +1,15 @@
from __future__ import annotations
from pathlib import Path
from .base import BaseTTS
class MockTTS(BaseTTS):
def generate(self, text: str, output_path: str | Path) -> str:
# No-op for offline tests: return empty path so video adapter skips audio.
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_bytes(b"")
return str(output_path)

View File

@@ -0,0 +1 @@

View File

@@ -0,0 +1,9 @@
from __future__ import annotations
from pathlib import Path
class BaseVideoGen:
def generate(self, image_path: str, prompt: dict, output_path: str | Path) -> str:
raise NotImplementedError

View File

@@ -0,0 +1,18 @@
from __future__ import annotations
from pathlib import Path
from engine.config import AppConfig
from .base import BaseVideoGen
class LTXVideoGen(BaseVideoGen):
def __init__(self, cfg: AppConfig):
self.cfg = cfg
def generate(self, image_path: str, prompt: dict, output_path: str | Path) -> str:
# Reserved for future: direct image->video generation (LTX / diffusion video).
# Current project keeps clip generation via MoviePy for stability.
raise NotImplementedError("LTXVideoGen is not implemented yet")

View File

@@ -0,0 +1,81 @@
from __future__ import annotations
import os
from pathlib import Path
from typing import Any
import numpy as np
from moviepy import AudioFileClip, VideoClip
from PIL import Image
from engine.config import AppConfig
from .base import BaseVideoGen
class MoviePyVideoGen(BaseVideoGen):
def __init__(self, cfg: AppConfig):
self.cfg = cfg
def generate(self, image_path: str, prompt: dict, output_path: str | Path) -> str:
output_path = Path(output_path)
output_path.parent.mkdir(parents=True, exist_ok=True)
# Required prompt fields for shot rendering.
duration_s = float(prompt.get("duration_s", 3))
fps = int(prompt.get("fps", self.cfg.get("video.mock_fps", 24)))
audio_path = prompt.get("audio_path")
# Clip resolution.
size = prompt.get("size")
if isinstance(size, (list, tuple)) and len(size) == 2:
w, h = int(size[0]), int(size[1])
else:
mock_size = self.cfg.get("video.mock_size", [1024, 576])
w, h = int(mock_size[0]), int(mock_size[1])
base_img = Image.open(image_path).convert("RGB")
def make_frame(t: float):
progress = float(t) / max(duration_s, 1e-6)
progress = max(0.0, min(1.0, progress))
scale = 1.0 + 0.03 * progress
new_w = max(w, int(w * scale))
new_h = max(h, int(h * scale))
frame = base_img.resize((new_w, new_h), Image.LANCZOS)
left = (new_w - w) // 2
top = (new_h - h) // 2
frame = frame.crop((left, top, left + w, top + h))
return np.array(frame)
video = VideoClip(make_frame, duration=duration_s, has_constant_size=True)
# Optional audio.
if audio_path and os.path.exists(str(audio_path)):
a = AudioFileClip(str(audio_path))
video = video.with_audio(a)
else:
a = None
try:
video.write_videofile(
str(output_path),
fps=fps,
codec="libx264",
audio_codec="aac",
preset="veryfast",
threads=2,
)
finally:
try:
video.close()
except Exception:
pass
if a is not None:
try:
a.close()
except Exception:
pass
return str(output_path)

27
engine/assembler.py Normal file
View File

@@ -0,0 +1,27 @@
from __future__ import annotations
from pathlib import Path
from moviepy import VideoFileClip, concatenate_videoclips
def assemble_clips(clips: list[str | Path], output_path: str | Path) -> Path:
out = Path(output_path)
out.parent.mkdir(parents=True, exist_ok=True)
if not clips:
raise ValueError("clips must not be empty")
vclips: list[VideoFileClip] = []
for c in clips:
vclips.append(VideoFileClip(str(c)))
final = concatenate_videoclips(vclips, method="compose")
try:
fps = vclips[0].fps if vclips and vclips[0].fps else 24
final.write_videofile(str(out), codec="libx264", audio_codec="aac", fps=fps, preset="medium", threads=4)
finally:
final.close()
for c in vclips:
c.close()
return out

View File

@@ -2,6 +2,7 @@ from __future__ import annotations
import asyncio
import json
import time
import uuid
from dataclasses import dataclass
from pathlib import Path
@@ -186,3 +187,215 @@ class ComfyClient:
# unreachable
# return ComfyResult(prompt_id=prompt_id, output_files=last_files)
# ---------------------------------------------------------------------------
# Minimal "text->image" helpers (used by shot rendering)
# ---------------------------------------------------------------------------
def _build_simple_workflow(
prompt_text: str,
*,
seed: int,
ckpt_name: str,
width: int,
height: int,
steps: int = 20,
cfg: float = 8.0,
sampler_name: str = "euler",
scheduler: str = "normal",
denoise: float = 1.0,
filename_prefix: str = "shot",
negative_text: str = "low quality, blurry",
) -> dict[str, Any]:
# Best-effort workflow. If your ComfyUI nodes/models differ, generation must fallback.
return {
"3": {
"class_type": "KSampler",
"inputs": {
"seed": int(seed),
"steps": int(steps),
"cfg": float(cfg),
"sampler_name": sampler_name,
"scheduler": scheduler,
"denoise": float(denoise),
"model": ["4", 0],
"positive": ["6", 0],
"negative": ["7", 0],
"latent_image": ["5", 0],
},
},
"4": {
"class_type": "CheckpointLoaderSimple",
"inputs": {
"ckpt_name": ckpt_name,
},
},
"5": {
"class_type": "EmptyLatentImage",
"inputs": {
"width": int(width),
"height": int(height),
"batch_size": 1,
},
},
"6": {
"class_type": "CLIPTextEncode",
"inputs": {
"text": prompt_text,
"clip": ["4", 1],
},
},
"7": {
"class_type": "CLIPTextEncode",
"inputs": {
"text": negative_text,
"clip": ["4", 1],
},
},
"8": {
"class_type": "VAEDecode",
"inputs": {
"samples": ["3", 0],
"vae": ["4", 2],
},
},
"9": {
"class_type": "SaveImage",
"inputs": {
"images": ["8", 0],
"filename_prefix": filename_prefix,
},
},
}
def _queue_prompt(base_url: str, workflow: dict[str, Any], client_id: str) -> str:
r = httpx.post(
base_url.rstrip("/") + "/prompt",
json={"prompt": workflow, "client_id": client_id},
timeout=30.0,
)
r.raise_for_status()
data = r.json()
pid = data.get("prompt_id")
if not isinstance(pid, str) or not pid:
raise RuntimeError(f"Unexpected /prompt response: {data}")
return pid
def _get_history_item(base_url: str, prompt_id: str) -> dict[str, Any] | None:
for url in (f"{base_url.rstrip('/')}/history/{prompt_id}", f"{base_url.rstrip('/')}/history"):
try:
r = httpx.get(url, timeout=30.0)
if r.status_code == 404:
continue
r.raise_for_status()
data = r.json()
if isinstance(data, dict):
if prompt_id in data and isinstance(data[prompt_id], dict):
return data[prompt_id]
if url.endswith(f"/{prompt_id}") and isinstance(data, dict):
return data
return None
except Exception:
continue
return None
def _extract_first_image_view_target(history_item: dict[str, Any]) -> tuple[str, str] | None:
outputs = history_item.get("outputs")
if not isinstance(outputs, dict):
return None
def walk(v: Any) -> list[dict[str, Any]]:
found: list[dict[str, Any]] = []
if isinstance(v, dict):
if isinstance(v.get("filename"), str) and v.get("filename").strip():
found.append(v)
for vv in v.values():
found.extend(walk(vv))
elif isinstance(v, list):
for vv in v:
found.extend(walk(vv))
return found
candidates = walk(outputs)
for c in candidates:
fn = str(c.get("filename", "")).strip()
sf = str(c.get("subfolder", "") or "").strip()
if fn:
return fn, sf
return None
def generate_image(
prompt_text: str,
output_dir: str | Path,
*,
cfg: AppConfig | None = None,
timeout_s: int = 60,
retry: int = 2,
width: int | None = None,
height: int | None = None,
filename_prefix: str = "shot",
ckpt_candidates: list[str] | None = None,
negative_text: str | None = None,
) -> Path:
cfg2 = cfg or AppConfig.load("./configs/config.yaml")
base_url = str(cfg2.get("app.comfy_base_url", "http://comfyui:8188")).rstrip("/")
out_dir = Path(output_dir)
out_dir.mkdir(parents=True, exist_ok=True)
if width is None or height is None:
mock_size = cfg2.get("video.mock_size", [1024, 576])
width = int(width or mock_size[0])
height = int(height or mock_size[1])
if negative_text is None:
negative_text = "low quality, blurry"
if ckpt_candidates is None:
ckpt_candidates = [
"v1-5-pruned-emaonly.ckpt",
"v1-5-pruned-emaonly.safetensors",
"sd-v1-5-tiny.safetensors",
]
last_err: Exception | None = None
for _attempt in range(max(1, retry)):
for ckpt_name in ckpt_candidates:
client_id = str(uuid.uuid4())
seed = int(uuid.uuid4().int % 2_147_483_647)
workflow = _build_simple_workflow(
prompt_text,
seed=seed,
ckpt_name=ckpt_name,
width=width,
height=height,
filename_prefix=filename_prefix,
negative_text=negative_text,
)
try:
prompt_id = _queue_prompt(base_url, workflow, client_id)
start = time.time()
while time.time() - start < timeout_s:
item = _get_history_item(base_url, prompt_id)
if isinstance(item, dict):
img_target = _extract_first_image_view_target(item)
if img_target:
filename, subfolder = img_target
view_url = f"{base_url}/view?filename={filename}&subfolder={subfolder}"
img_resp = httpx.get(view_url, timeout=60.0)
img_resp.raise_for_status()
image_path = out_dir / filename
image_path.write_bytes(img_resp.content)
return image_path
time.sleep(1.0)
except Exception as e:
last_err = e
continue
raise RuntimeError(f"ComfyUI image generation failed after retries: {last_err}")

43
engine/director.py Normal file
View File

@@ -0,0 +1,43 @@
from __future__ import annotations
from typing import Any
def _read_scene(scene: Any) -> tuple[str, str, str]:
if hasattr(scene, "image_prompt") and hasattr(scene, "video_motion") and hasattr(scene, "narration"):
return (
str(getattr(scene, "image_prompt", "")).strip(),
str(getattr(scene, "video_motion", "")).strip(),
str(getattr(scene, "narration", "")).strip(),
)
if isinstance(scene, dict):
return (
str(scene.get("image_prompt", "")).strip(),
str(scene.get("video_motion", scene.get("motion", ""))).strip(),
str(scene.get("narration", scene.get("tts", ""))).strip(),
)
return ("", "", "")
def scenes_to_shots(scenes: list) -> list[dict[str, Any]]:
shots: list[dict[str, Any]] = []
for scene_idx, scene in enumerate(scenes, start=1):
image_prompt, motion, tts = _read_scene(scene)
scene_id = f"scene_{scene_idx:02d}"
shot_id = f"{scene_id}_01"
# Keep default duration simple and deterministic for MVP.
duration = 3
shots.append(
{
"shot_id": shot_id,
"scene_id": scene_id,
"duration": int(duration),
"image_prompt": image_prompt,
"motion": motion,
"camera": "",
"tts": tts,
"status": "pending",
}
)
return shots

452
engine/main.py Normal file
View File

@@ -0,0 +1,452 @@
from __future__ import annotations
import argparse
import asyncio
import json
import os
import random
import sys
from pathlib import Path
from typing import Any
from moviepy import ImageClip
from PIL import Image, ImageDraw, ImageFont
from engine.model_factory import get_model
from engine.prompt_injector import inject_prompt
from engine.adapters.image.mock_adapter import MockImageGen
from engine.assembler import assemble_clips
from engine.comfy_client import ComfyClient
from engine.config import AppConfig
from engine.director import scenes_to_shots
from engine.shot_executor import render_shot
from engine.task_store import create_task, update_shot_status, update_task_status
from engine.types import Scene
from engine.video_editor import Segment, render_final
def _emit(line: str) -> None:
print(line, flush=True)
def _emit_scene(scene_idx: int, scene: Scene, extra: dict[str, Any] | None = None) -> None:
payload = {
"index": scene_idx,
"image_prompt": scene.image_prompt,
"video_motion": scene.video_motion,
"narration": scene.narration,
}
if extra:
payload.update(extra)
_emit("SCENE_JSON " + json.dumps(payload, ensure_ascii=False))
def _ensure_mock_image(path: Path, size: tuple[int, int]) -> Path:
if path.exists():
return path
path.parent.mkdir(parents=True, exist_ok=True)
img = Image.new("RGB", size, color=(20, 24, 33))
draw = ImageDraw.Draw(img)
text = "MOCK"
try:
font = ImageFont.load_default()
except Exception:
font = None
draw.text((size[0] // 2 - 30, size[1] // 2 - 10), text, fill=(240, 240, 240), font=font)
img.save(path)
return path
def _make_mock_video(out_path: Path, image_path: Path, duration_s: float, fps: int) -> Path:
out_path.parent.mkdir(parents=True, exist_ok=True)
clip = ImageClip(str(image_path)).with_duration(max(0.5, duration_s)).with_fps(fps)
try:
clip.write_videofile(str(out_path), codec="libx264", audio=False, fps=fps, preset="veryfast")
finally:
clip.close()
return out_path
def _prog(p: float, msg: str) -> None:
p2 = max(0.0, min(1.0, float(p)))
_emit("PROG " + json.dumps({"p": p2, "msg": msg}, ensure_ascii=False))
def _prog_shot(shot_id: str, status: str) -> None:
_emit(f"PROG_SHOT {shot_id} {status}")
def _normalize_style(style: str | None) -> str:
s = (style or "").strip()
if not s:
return ""
# Allow both Chinese labels and simple aliases
mapping = {
"电影感": "电影感",
"cinema": "电影感",
"二次元": "二次元",
"anime": "二次元",
"写实": "写实",
"real": "写实",
}
return mapping.get(s, s)
def _inject_globals_into_prompt(prompt: str, *, style: str | None, character: str | None) -> str:
style_n = _normalize_style(style)
character_n = (character or "").strip()
if not style_n and not character_n:
return prompt
parts: list[str] = [prompt.strip(), "\n\n[Global Constraints]"]
if style_n:
parts.append(f"- Global Style: {style_n}")
if character_n:
parts.append(f"- Character Preset: {character_n}")
parts.append("请严格遵守上述全局信息,并保持三分镜主角一致。")
return "\n".join(parts).strip()
def _decorate_image_prompt(image_prompt: str, *, style: str | None, character: str | None) -> str:
# Industrial rule: final_prompt = f"{global_character}, {global_style}, {scene_prompt}"
style_n = _normalize_style(style)
character_n = (character or "").strip()
parts = []
if character_n:
parts.append(character_n)
if style_n:
parts.append(style_n)
parts.append(image_prompt)
return ", ".join([p for p in parts if p]).strip(", ")
def _fallback_scenes(prompt: str) -> list[Scene]:
return [
Scene(
image_prompt=f"{prompt},城市夜景,霓虹灯,电影感",
video_motion="缓慢推进镜头,轻微摇镜",
narration="夜色温柔落在街灯上",
),
Scene(
image_prompt=f"{prompt},咖啡店窗边,暖光,细雨",
video_motion="侧向平移,人物轻轻抬头",
narration="雨声里藏着一段回忆",
),
Scene(
image_prompt=f"{prompt},桥上远景,车流光轨,温暖",
video_motion="拉远全景,光轨流动",
narration="我们在光里学会告别",
),
]
def _generate_scene_preview(
*,
cfg: AppConfig,
out_dir: Path,
image_prompt: str,
style: str | None,
character: str | None,
) -> str | None:
try:
image_gen = get_model("image", cfg)
except Exception:
image_gen = get_model("image_fallback", cfg)
global_cfg = dict(cfg.get("global", {}) or {})
if style:
global_cfg["style"] = style
if character:
global_cfg["character"] = character
prompt_obj = inject_prompt(global_cfg, {"prompt": image_prompt})
try:
image_path = image_gen.generate(prompt_obj, out_dir)
except Exception:
try:
image_path = get_model("image_fallback", cfg).generate(prompt_obj, out_dir)
except Exception:
# Last-resort hard fallback: never block script stage due to preview failures.
image_path = MockImageGen().generate(prompt_obj, out_dir)
p = Path(str(image_path))
if not p.exists():
return None
return f"/api/static/{out_dir.name}/{p.name}"
def _has_llm_key(cfg: AppConfig) -> bool:
api_key_env = str(cfg.get("openai.api_key_env", "OPENAI_API_KEY") or "OPENAI_API_KEY").strip()
# Env var name case.
if os.environ.get(api_key_env):
return True
# Literal key case (DashScope / OpenAI-compatible).
if api_key_env.startswith("sk-"):
return True
return False
def _parse_scenes_from_obj(obj: Any) -> list[Scene]:
if not isinstance(obj, dict):
raise ValueError("payload must be object")
if "scene" in obj and obj.get("scene") is not None:
s = obj.get("scene")
if not isinstance(s, dict):
raise ValueError("payload.scene must be object")
return [
Scene(
image_prompt=str(s.get("image_prompt", "")).strip(),
video_motion=str(s.get("video_motion", "")).strip(),
narration=str(s.get("narration", "")).strip(),
)
]
scenes_raw = obj.get("scenes")
if not isinstance(scenes_raw, list) or not scenes_raw:
raise ValueError("payload.scenes must be non-empty array")
scenes: list[Scene] = []
for i, s in enumerate(scenes_raw, start=1):
if not isinstance(s, dict):
raise ValueError(f"scenes[{i}] must be object")
scenes.append(
Scene(
image_prompt=str(s.get("image_prompt", "")).strip(),
video_motion=str(s.get("video_motion", "")).strip(),
narration=str(s.get("narration", "")).strip(),
)
)
return scenes
async def _render_from_scenes(
prompt: str,
scenes: list[Scene],
cfg: AppConfig,
mock: bool,
*,
style: str | None,
character: str | None,
out_dir: Path,
) -> Path:
# Force-inject globals into image prompts for rendering.
scenes2 = [
Scene(
image_prompt=_decorate_image_prompt(s.image_prompt, style=style, character=character),
video_motion=s.video_motion,
narration=s.narration,
)
for s in scenes
]
_prog(0.15, "Generating TTS")
audios = await synthesize_scenes([s.narration for s in scenes2], cfg)
segments: list[Segment] = []
fps = int(cfg.get("video.mock_fps", 24))
mock_size = cfg.get("video.mock_size", [1024, 576])
w, h = int(mock_size[0]), int(mock_size[1])
mock_image = _ensure_mock_image(Path("./assets/mock.png"), (w, h))
if mock:
_prog(0.35, "Generating mock videos")
for i, (scene, audio) in enumerate(zip(scenes2, audios), start=1):
vpath = Path("./assets/mock_videos") / f"scene_{i:02d}.mp4"
_make_mock_video(vpath, mock_image, audio.duration_s, fps=fps)
segments.append(Segment(video_path=vpath, audio_path=audio.path, narration=scene.narration))
_prog(0.85, "Compositing final video")
out_path = out_dir / "final.mp4"
return render_final(segments, cfg, output_path=out_path)
comfy = ComfyClient(cfg)
wf = comfy.load_workflow()
for i, (scene, audio) in enumerate(zip(scenes2, audios), start=1):
_prog(0.25 + 0.45 * (i - 1) / max(1, len(scenes2)), f"Rendering scene {i} with ComfyUI")
seed = random.randint(1, 2_147_483_647)
wf_i = comfy.inject_params(wf, image_prompt=scene.image_prompt, seed=seed, motion_prompt=scene.video_motion or None)
result = await comfy.run_workflow(wf_i)
candidates = [p for p in result.output_files if p.suffix.lower() in {".mp4", ".mov", ".webm"}]
video_path = candidates[0] if candidates else result.output_files[0]
segments.append(Segment(video_path=video_path, audio_path=audio.path, narration=scene.narration))
_prog(0.85, "Compositing final video")
out_path = out_dir / "final.mp4"
return render_final(segments, cfg, output_path=out_path)
def _read_stdin_json() -> Any:
raw = sys.stdin.read()
if not raw.strip():
return None
return json.loads(raw)
def step_script(prompt: str, cfg: AppConfig, mock: bool, *, style: str | None, character: str | None, out_dir: Path) -> int:
prompt2 = _inject_globals_into_prompt(prompt, style=style, character=character)
if mock and not _has_llm_key(cfg):
# fallback scenes still should include global injection
scenes = _fallback_scenes(prompt)
else:
llm = get_model("llm", cfg)
scenes = llm.generate_script(prompt2, context=None)
out_dir.mkdir(parents=True, exist_ok=True)
_emit("SCRIPT_BEGIN")
for idx, s in enumerate(scenes, start=1):
s2 = Scene(
image_prompt=_decorate_image_prompt(s.image_prompt, style=style, character=character),
video_motion=s.video_motion,
narration=s.narration,
)
preview_url = _generate_scene_preview(
cfg=cfg,
out_dir=out_dir,
image_prompt=s2.image_prompt,
style=style,
character=character,
)
_emit_scene(idx, s2, extra={"preview_url": preview_url or ""})
_emit("SCRIPT_END")
(out_dir / "scenes.json").write_text(
json.dumps(
{"scenes": [{"image_prompt": s.image_prompt, "video_motion": s.video_motion, "narration": s.narration} for s in scenes]},
ensure_ascii=False,
indent=2,
),
encoding="utf-8",
)
return 0
def step_refine(
prompt: str,
cfg: AppConfig,
mock: bool,
scene_index: int,
*,
style: str | None,
character: str | None,
out_dir: Path,
) -> int:
prompt2 = _inject_globals_into_prompt(prompt, style=style, character=character)
payload = _read_stdin_json()
scenes = _parse_scenes_from_obj(payload)
# If client only sent one scene, treat it as the target scene.
if len(scenes) == 1:
target_index = 1
else:
target_index = scene_index
if not (1 <= target_index <= len(scenes)):
raise ValueError("scene_index out of range")
if mock and not _has_llm_key(cfg):
# Simple fallback: append a tiny polish hint to narration
s = scenes[target_index - 1]
refined = Scene(
image_prompt=_decorate_image_prompt(s.image_prompt, style=style, character=character),
video_motion=s.video_motion,
narration=(s.narration + "(更凝练)")[:30],
)
else:
llm = get_model("llm", cfg)
# Context carries prompt + scenes for consistent refinement.
refined0 = llm.refine_scene(scenes[target_index - 1], context={"prompt": prompt2, "scenes": scenes, "target_index": target_index})
refined = Scene(
image_prompt=_decorate_image_prompt(refined0.image_prompt, style=style, character=character),
video_motion=refined0.video_motion,
narration=refined0.narration,
)
# Keep the original index for frontend replacement.
preview_url = _generate_scene_preview(
cfg=cfg,
out_dir=out_dir,
image_prompt=refined.image_prompt,
style=style,
character=character,
)
_emit_scene(scene_index, refined, extra={"preview_url": preview_url or ""})
out_dir.mkdir(parents=True, exist_ok=True)
(out_dir / f"refine_scene_{scene_index}.json").write_text(
json.dumps(
{"index": scene_index, "image_prompt": refined.image_prompt, "video_motion": refined.video_motion, "narration": refined.narration},
ensure_ascii=False,
indent=2,
),
encoding="utf-8",
)
return 0
def step_render(prompt: str, cfg: AppConfig, mock: bool, *, style: str | None, character: str | None, out_dir: Path) -> int:
payload = _read_stdin_json()
scenes_raw = _parse_scenes_from_obj(payload)
scenes = [
Scene(
image_prompt=_decorate_image_prompt(s.image_prompt, style=style, character=character),
video_motion=s.video_motion,
narration=s.narration,
)
for s in scenes_raw
]
shots = scenes_to_shots(scenes)
out_dir.mkdir(parents=True, exist_ok=True)
task_id = out_dir.name
create_task(task_id, shots)
update_task_status(task_id, "running")
_prog(0.05, "Start render")
clips: list[str] = []
total = max(1, len(shots))
try:
for idx, shot in enumerate(shots, start=1):
shot_id = str(shot.get("shot_id", f"shot_{idx:02d}"))
update_shot_status(task_id, shot_id, "running")
_prog_shot(shot_id, "running")
clip_path = render_shot(shot, out_dir, cfg, mock=mock)
clips.append(clip_path)
update_shot_status(task_id, shot_id, "done")
_prog_shot(shot_id, "done")
_prog(0.05 + 0.8 * idx / total, f"Rendered shot {idx}/{total}")
final_out = out_dir / "final.mp4"
out = assemble_clips(clips, final_out)
update_task_status(task_id, "done")
_prog(1.0, "Render finished")
_emit("RENDER_DONE " + json.dumps({"output": str(out)}, ensure_ascii=False))
return 0
except Exception:
update_task_status(task_id, "failed")
raise
def main() -> int:
parser = argparse.ArgumentParser(description="AIGC interactive POC entry")
parser.add_argument("--prompt", required=True, help="User creative prompt")
parser.add_argument("--config", default="./configs/config.yaml", help="Config yaml path")
parser.add_argument("--mock", action="store_true", help="Mock mode (no ComfyUI needed)")
parser.add_argument("--step", default="script", choices=["script", "render", "refine"])
parser.add_argument("--scene-index", type=int, default=1, help="For --step=refine only (1-based)")
parser.add_argument("--global-style", default="", help="Global style lock (e.g. 电影感/二次元/写实)")
parser.add_argument("--character", default="", help="Character preset lock (main character description)")
parser.add_argument("--task-id", required=True, help="Task id (UUID). Outputs go to outputs/{task_id}/")
args = parser.parse_args()
cfg = AppConfig.load(args.config)
out_dir = Path("./outputs") / str(args.task_id)
if args.step == "script":
return step_script(args.prompt, cfg, mock=args.mock, style=args.global_style, character=args.character, out_dir=out_dir)
if args.step == "render":
return step_render(args.prompt, cfg, mock=args.mock, style=args.global_style, character=args.character, out_dir=out_dir)
if args.step == "refine":
return step_refine(
args.prompt,
cfg,
mock=args.mock,
scene_index=args.scene_index,
style=args.global_style,
character=args.character,
out_dir=out_dir,
)
raise SystemExit(2)
if __name__ == "__main__":
raise SystemExit(main())

80
engine/model_factory.py Normal file
View File

@@ -0,0 +1,80 @@
from __future__ import annotations
import os
from typing import Any
from engine.config import AppConfig
def _provider(cfg: AppConfig, path: str, default: str) -> str:
env_map = {
"llm.provider": "ENGINE_LLM_PROVIDER",
"image.provider": "ENGINE_IMAGE_PROVIDER",
"image_fallback.provider": "ENGINE_IMAGE_FALLBACK_PROVIDER",
"video.provider": "ENGINE_VIDEO_PROVIDER",
"tts.provider": "ENGINE_TTS_PROVIDER",
}
env_key = env_map.get(path)
if env_key:
env_val = str(os.environ.get(env_key, "")).strip()
if env_val:
return env_val
v = cfg.get(path, default)
return str(v or default).strip() or default
def get_model(name: str, cfg: AppConfig) -> Any:
if name == "llm":
provider = _provider(cfg, "llm.provider", "openai")
if provider == "mock":
from engine.adapters.llm.mock_adapter import MockLLM
return MockLLM()
from engine.adapters.llm.openai_adapter import OpenAIAdapter
return OpenAIAdapter(cfg)
if name in ("image", "image_fallback"):
section = "image" if name == "image" else "image_fallback"
# Important: fallback must default to mock, not follow primary image provider.
provider_default = "mock" if name == "image_fallback" else _provider(cfg, "image.provider", "mock")
provider = _provider(cfg, f"{section}.provider", provider_default)
if provider == "comfy":
from engine.adapters.image.comfy_adapter import ComfyAdapter
return ComfyAdapter(cfg)
if provider == "replicate":
from engine.adapters.image.replicate_adapter import ReplicateAdapter
return ReplicateAdapter(cfg)
if provider == "openai":
from engine.adapters.image.openai_image_adapter import OpenAIImageAdapter
return OpenAIImageAdapter(cfg)
from engine.adapters.image.mock_adapter import MockImageGen
return MockImageGen()
if name == "video":
provider = _provider(cfg, "video.provider", "moviepy")
if provider == "ltx":
from engine.adapters.video.ltx_adapter import LTXVideoGen
return LTXVideoGen(cfg)
from engine.adapters.video.moviepy_adapter import MoviePyVideoGen
return MoviePyVideoGen(cfg)
if name == "tts":
provider = _provider(cfg, "tts.provider", "edge")
if provider == "mock":
from engine.adapters.tts.mock_adapter import MockTTS
return MockTTS()
from engine.adapters.tts.edge_adapter import EdgeTTS
return EdgeTTS(cfg)
raise ValueError(f"Unknown model adapter name: {name}")

23
engine/prompt_injector.py Normal file
View File

@@ -0,0 +1,23 @@
from __future__ import annotations
from typing import Any
def inject_prompt(global_cfg: dict[str, Any] | None, scene: dict[str, Any]) -> dict[str, str]:
"""
Unified positive/negative prompt builder.
Note: current pipeline already injects some globals into `scene["image_prompt"]`.
"""
global_cfg = global_cfg or {}
character = str(global_cfg.get("character", "") or "").strip()
style = str(global_cfg.get("style", "") or "").strip()
negative = str(global_cfg.get("negative_prompt", "") or "").strip()
base = str(scene.get("prompt") or scene.get("image_prompt") or "").strip()
if not base:
base = str(scene.get("image_prompt") or "")
positive_parts = [p for p in [character, style, base] if p]
positive = ", ".join(positive_parts).strip(", ")
return {"positive": positive, "negative": negative}

80
engine/render_pipeline.py Normal file
View File

@@ -0,0 +1,80 @@
from __future__ import annotations
from pathlib import Path
from typing import Any
from engine.model_factory import get_model
from engine.prompt_injector import inject_prompt
from engine.adapters.image.mock_adapter import MockImageGen
def render_shot(shot: dict[str, Any], cfg, out_dir: str | Path, *, mock: bool = False) -> str:
out_dir = Path(out_dir)
clips_dir = out_dir / "clips"
audio_dir = out_dir / "audio"
clips_dir.mkdir(parents=True, exist_ok=True)
audio_dir.mkdir(parents=True, exist_ok=True)
shot_id = str(shot.get("shot_id", "unknown"))
duration_s = float(shot.get("duration", 3))
narration = str(shot.get("tts", "")).strip()
# Models from config.
image_fallback_gen = get_model("image_fallback", cfg)
try:
image_gen = get_model("image", cfg)
except Exception as e:
# Covers missing optional deps at adapter init time (e.g. replicate/openai packages).
print(f"[WARN] image provider init failed, fallback to image_fallback: {e}")
image_gen = image_fallback_gen
tts = get_model("tts", cfg)
video_gen = get_model("video", cfg)
# Prompt injection.
global_cfg = cfg.get("global", {}) if hasattr(cfg, "get") else {}
prompt_obj = inject_prompt(global_cfg, {"prompt": shot.get("image_prompt", "")})
positive_prompt = prompt_obj.get("positive", "")
# Prompt enrichment: keeps ComfyUI generations cinematic and detailed.
enrich_style = "cinematic, ultra realistic, 4k, detailed lighting"
if enrich_style not in positive_prompt:
positive_prompt = f"{positive_prompt}, {enrich_style}".strip(", ")
prompt_obj["positive"] = positive_prompt
# 1) image
try:
image_path = image_gen.generate(prompt_obj, out_dir)
except Exception as e:
# Config-driven fallback; keeps provider switching non-invasive.
print(f"[WARN] Image generation failed, fallback to image_fallback: {e}")
try:
image_path = image_fallback_gen.generate(prompt_obj, out_dir)
except Exception as e2:
print(f"[WARN] image_fallback also failed, hard fallback to mock: {e2}")
image_path = MockImageGen().generate(prompt_obj, out_dir)
scene_label = str(shot.get("scene_id") or shot.get("shot_id") or "scene_unknown")
print(f"[SHOT_RENDER] {scene_label} -> image generated: {image_path}")
# 2) audio (optional)
audio_path = None
if narration:
# Use a stable per-shot audio filename.
ap = audio_dir / f"shot_{shot_id}.mp3"
try:
audio_path = tts.generate(narration, ap)
except Exception as e:
# Don't fail the whole render due to TTS issues.
print(f"[WARN] TTS failed, continue without audio: {e}")
audio_path = None
# 3) clip
clip_out = clips_dir / f"shot_{shot_id}.mp4"
prompt = {
"duration_s": duration_s,
"fps": int(cfg.get("video.mock_fps", 24)),
"audio_path": audio_path,
"size": cfg.get("video.mock_size", None),
}
clip_path = video_gen.generate(image_path, prompt, clip_out)
return clip_path

View File

@@ -10,6 +10,38 @@ from .config import AppConfig
from .types import Scene
def _looks_like_api_key(v: str) -> bool:
vv = (v or "").strip()
# Common prefixes: DashScope uses "sk-..."; we keep it minimal and permissive.
return bool(vv) and vv.startswith("sk-")
def _looks_like_url(v: str) -> bool:
vv = (v or "").strip()
return vv.startswith("http://") or vv.startswith("https://")
def _resolve_openai_credentials(cfg: AppConfig) -> tuple[str, str | None]:
api_key_env = str(cfg.get("openai.api_key_env", "OPENAI_API_KEY") or "").strip()
base_url_env = str(cfg.get("openai.base_url_env", "OPENAI_BASE_URL") or "").strip()
# 1) Resolve api_key: allow both "env var name" and "literal key" for safety.
api_key = os.environ.get(api_key_env) if api_key_env else None
if not api_key and api_key_env and _looks_like_api_key(api_key_env):
api_key = api_key_env
if not api_key:
raise RuntimeError(f"Missing OpenAI compatible API key (env={api_key_env})")
# 2) Resolve base_url: allow both "env var name" and "literal URL".
base_url = os.environ.get(base_url_env) if base_url_env else None
if not base_url and base_url_env and _looks_like_url(base_url_env):
base_url = base_url_env
if base_url:
base_url = str(base_url).strip() or None
return str(api_key), base_url
def _system_prompt(scene_count: int, min_chars: int, max_chars: int) -> str:
return f"""你是一个专业短视频编剧与分镜师。
请把用户的创意扩展为 {scene_count} 个分镜(Scene) 的 JSON。
@@ -33,22 +65,36 @@ def _system_prompt(scene_count: int, min_chars: int, max_chars: int) -> str:
"""
def _refine_system_prompt(min_chars: int, max_chars: int) -> str:
return f"""你是短视频分镜润色助手。
你会收到用户的原始创意 prompt、以及一组三分镜其中主角设定需一致
你的任务:只润色指定的一个 Scene使其更具体、更镜头化、更适合生成视频同时保持主角描述与其它分镜一致。
硬性约束:
1) 只修改目标 Scene不要改其它 Scene。
2) 目标 Scene 必须包含image_prompt, video_motion, narration。
3) narration 为中文旁白,每段控制在约 {min_chars}-{max_chars} 字左右。
4) 输出只允许 JSON不要解释、不要 markdown。
输出 JSON Schema
{{
"scene": {{"image_prompt":"...","video_motion":"...","narration":"..."}}
}}
"""
def generate_scenes(user_prompt: str, cfg: AppConfig) -> list[Scene]:
scene_count = int(cfg.get("script_gen.scene_count", 3))
min_chars = int(cfg.get("script_gen.narration_min_chars", 15))
max_chars = int(cfg.get("script_gen.narration_max_chars", 20))
api_key_env = str(cfg.get("openai.api_key_env", "OPENAI_API_KEY"))
base_url_env = str(cfg.get("openai.base_url_env", "OPENAI_BASE_URL"))
model = str(cfg.get("openai.model", "gpt-4o-mini"))
api_key = os.environ.get(api_key_env)
if not api_key:
raise RuntimeError(f"Missing env var {api_key_env} for OpenAI API key")
api_key, base_url = _resolve_openai_credentials(cfg)
client = OpenAI(
api_key=api_key,
base_url=os.environ.get(base_url_env) or None,
base_url=base_url,
)
resp = client.chat.completions.create(
@@ -78,3 +124,52 @@ def generate_scenes(user_prompt: str, cfg: AppConfig) -> list[Scene]:
raise ValueError(f"Scene[{i}] missing required fields")
scenes.append(Scene(image_prompt=image_prompt, video_motion=video_motion, narration=narration))
return scenes
def refine_scene(*, prompt: str, scenes: list[Scene], target_index: int, cfg: AppConfig) -> Scene:
if not (1 <= target_index <= len(scenes)):
raise ValueError("target_index out of range")
min_chars = int(cfg.get("script_gen.narration_min_chars", 15))
max_chars = int(cfg.get("script_gen.narration_max_chars", 20))
model = str(cfg.get("openai.model", "gpt-4o-mini"))
api_key, base_url = _resolve_openai_credentials(cfg)
client = OpenAI(
api_key=api_key,
base_url=base_url,
)
scenes_payload = [
{"image_prompt": s.image_prompt, "video_motion": s.video_motion, "narration": s.narration}
for s in scenes
]
user_payload = {
"prompt": prompt,
"target_index": target_index,
"scenes": scenes_payload,
}
resp = client.chat.completions.create(
model=model,
messages=[
{"role": "system", "content": _refine_system_prompt(min_chars, max_chars)},
{"role": "user", "content": json.dumps(user_payload, ensure_ascii=False)},
],
response_format={"type": "json_object"},
temperature=0.6,
)
content = resp.choices[0].message.content or "{}"
data: Any = json.loads(content)
s = data.get("scene")
if not isinstance(s, dict):
raise ValueError("Model refine output missing scene")
image_prompt = str(s.get("image_prompt", "")).strip()
video_motion = str(s.get("video_motion", "")).strip()
narration = str(s.get("narration", "")).strip()
if not image_prompt or not narration:
raise ValueError("Refined scene missing required fields")
return Scene(image_prompt=image_prompt, video_motion=video_motion, narration=narration)

177
engine/shot_executor.py Normal file
View File

@@ -0,0 +1,177 @@
from __future__ import annotations
import asyncio
import os
import random
from pathlib import Path
from typing import Any
import numpy as np
from moviepy import AudioFileClip, VideoClip
from PIL import Image
from urllib.request import urlopen
from .audio_gen import synthesize_one
from .comfy_client import generate_image as comfy_generate_image
from .config import AppConfig
from .render_pipeline import render_shot as render_shot_pipeline
ASSETS_DIR = "assets"
DEMO_IMAGE = os.path.join(ASSETS_DIR, "demo.jpg")
def ensure_demo_image() -> None:
os.makedirs(ASSETS_DIR, exist_ok=True)
if os.path.exists(DEMO_IMAGE):
return
# Simple placeholder image source.
url = "https://picsum.photos/1280/720"
with urlopen(url, timeout=30) as resp:
data = resp.read()
with open(DEMO_IMAGE, "wb") as f:
f.write(data)
def generate_image_mock(prompt: str) -> str:
# Keep interface compatible with the requested interface.
_ = prompt
ensure_demo_image()
return DEMO_IMAGE
def enrich_prompt(prompt_text: str) -> str:
style = "cinematic, ultra realistic, 4k, detailed lighting"
pt = (prompt_text or "").strip()
if not pt:
return style
return f"{pt}, {style}"
async def _render_shot_async(
shot: dict[str, Any],
output_dir: str | Path,
cfg: AppConfig,
*,
mock: bool = False,
) -> str:
out_dir = Path(output_dir)
out_dir.mkdir(parents=True, exist_ok=True)
clips_dir = out_dir / "clips"
audio_dir = out_dir / "audio"
clips_dir.mkdir(parents=True, exist_ok=True)
audio_dir.mkdir(parents=True, exist_ok=True)
shot_id = str(shot.get("shot_id", "unknown"))
image_prompt = str(shot.get("image_prompt", "")).strip()
prompt_text = str(shot.get("prompt", image_prompt) or image_prompt).strip()
tts_text = str(shot.get("tts", "")).strip()
duration_s = max(1.0, float(shot.get("duration", 3)))
voice = str(cfg.get("tts.voice", "zh-CN-XiaoxiaoNeural"))
rate = str(cfg.get("tts.rate", "+0%"))
volume = str(cfg.get("tts.volume", "+0%"))
audio_asset: Any | None = None
if tts_text:
audio_path = audio_dir / f"shot_{shot_id}.mp3"
audio_asset = await synthesize_one(tts_text, audio_path, voice, rate, volume)
# Use config-defined output resolution for stable concatenation.
mock_size = cfg.get("video.mock_size", [1024, 576])
w, h = int(mock_size[0]), int(mock_size[1])
fps = int(cfg.get("video.mock_fps", 24))
if audio_asset and audio_asset.duration_s:
duration_s = max(duration_s, float(audio_asset.duration_s))
# shot -> image (ComfyUI first; fallback to demo.jpg)
image_path: str
if mock:
image_path = generate_image_mock(prompt_text)
else:
try:
enriched = enrich_prompt(prompt_text)
# Store generated images directly under outputs/{task_id}
# (as required by verification: outputs/{task_id}/*.png).
image_path = str(
comfy_generate_image(
enriched,
out_dir,
cfg=cfg,
timeout_s=60,
retry=2,
filename_prefix=f"shot_{shot_id}",
)
)
print(f"[SHOT_RENDER] {shot_id} -> image generated: {image_path}")
except Exception as e:
print(f"[WARN] Comfy failed, fallback to demo: {e}")
image_path = generate_image_mock(prompt_text)
# Ensure image exists before rendering.
if not image_path or not os.path.exists(image_path):
image_path = generate_image_mock(prompt_text)
base_img = Image.open(image_path).convert("RGB")
def make_frame(t: float):
# Subtle zoom-in from 1.00 to ~1.03 over the clip duration.
progress = float(t) / max(duration_s, 1e-6)
progress = max(0.0, min(1.0, progress))
scale = 1.0 + 0.03 * progress
new_w = max(w, int(w * scale))
new_h = max(h, int(h * scale))
frame = base_img.resize((new_w, new_h), Image.LANCZOS)
left = (new_w - w) // 2
top = (new_h - h) // 2
frame = frame.crop((left, top, left + w, top + h))
return np.array(frame)
# image -> video
video = VideoClip(make_frame, duration=duration_s, has_constant_size=True)
# optional audio -> clip
audio_clip: AudioFileClip | None = None
if audio_asset and os.path.exists(str(audio_asset.path)):
audio_clip = AudioFileClip(str(audio_asset.path))
video = video.with_audio(audio_clip)
# output
clip_out = clips_dir / f"shot_{shot_id}.mp4"
print(f"[SHOT_RENDER] {shot_id} -> {clip_out}")
try:
video.write_videofile(
str(clip_out),
fps=fps,
codec="libx264",
audio_codec="aac",
preset="veryfast",
threads=2,
)
finally:
try:
video.close()
except Exception:
pass
if audio_clip is not None:
try:
audio_clip.close()
except Exception:
pass
return str(clip_out)
def render_shot(
shot: dict[str, Any],
output_dir: str | Path,
cfg: AppConfig | None = None,
*,
mock: bool = False,
) -> str:
cfg2 = cfg or AppConfig.load("./configs/config.yaml")
return render_shot_pipeline(shot, cfg2, output_dir, mock=mock)

68
engine/task_store.py Normal file
View File

@@ -0,0 +1,68 @@
from __future__ import annotations
import json
from pathlib import Path
from typing import Any
def _task_path(task_id: str, base_dir: str | Path = "./outputs") -> Path:
return Path(base_dir) / str(task_id) / "task.json"
def create_task(task_id: str, shots: list[dict[str, Any]], base_dir: str | Path = "./outputs") -> dict[str, Any]:
p = _task_path(task_id, base_dir=base_dir)
p.parent.mkdir(parents=True, exist_ok=True)
data = {
"task_id": str(task_id),
"status": "queued",
"shots": [
{
"shot_id": str(s.get("shot_id", "")),
"status": str(s.get("status", "pending") or "pending"),
}
for s in shots
],
}
p.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
return data
def load_task(task_id: str, base_dir: str | Path = "./outputs") -> dict[str, Any]:
p = _task_path(task_id, base_dir=base_dir)
if not p.exists():
raise FileNotFoundError(f"task file not found: {p}")
raw = json.loads(p.read_text(encoding="utf-8"))
if not isinstance(raw, dict):
raise ValueError("task.json must be an object")
return raw
def _save_task(task_id: str, data: dict[str, Any], base_dir: str | Path = "./outputs") -> None:
p = _task_path(task_id, base_dir=base_dir)
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8")
def update_shot_status(task_id: str, shot_id: str, status: str, base_dir: str | Path = "./outputs") -> dict[str, Any]:
data = load_task(task_id, base_dir=base_dir)
shots = data.get("shots")
if not isinstance(shots, list):
raise ValueError("task.json shots must be an array")
found = False
for s in shots:
if isinstance(s, dict) and str(s.get("shot_id", "")) == str(shot_id):
s["status"] = str(status)
found = True
break
if not found:
shots.append({"shot_id": str(shot_id), "status": str(status)})
_save_task(task_id, data, base_dir=base_dir)
return data
def update_task_status(task_id: str, status: str, base_dir: str | Path = "./outputs") -> dict[str, Any]:
data = load_task(task_id, base_dir=base_dir)
data["status"] = str(status)
_save_task(task_id, data, base_dir=base_dir)
return data

Binary file not shown.

149
main.py
View File

@@ -7,154 +7,11 @@ import os
import random
from pathlib import Path
from fastapi import FastAPI
from moviepy import ImageClip
from PIL import Image, ImageDraw, ImageFont
from engine.audio_gen import synthesize_scenes
from engine.comfy_client import ComfyClient
from engine.config import AppConfig
from engine.script_gen import generate_scenes
from engine.types import Scene
from engine.video_editor import Segment, render_final
app = FastAPI(title="AiVideo POC")
def _ensure_mock_image(path: Path, size: tuple[int, int]) -> Path:
if path.exists():
return path
path.parent.mkdir(parents=True, exist_ok=True)
img = Image.new("RGB", size, color=(20, 24, 33))
draw = ImageDraw.Draw(img)
text = "MOCK"
try:
font = ImageFont.load_default()
except Exception:
font = None
draw.text((size[0] // 2 - 30, size[1] // 2 - 10), text, fill=(240, 240, 240), font=font)
img.save(path)
return path
def _make_mock_video(out_path: Path, image_path: Path, duration_s: float, fps: int) -> Path:
out_path.parent.mkdir(parents=True, exist_ok=True)
clip = ImageClip(str(image_path)).with_duration(max(0.5, duration_s)).with_fps(fps)
try:
clip.write_videofile(str(out_path), codec="libx264", audio=False, fps=fps, preset="veryfast")
finally:
clip.close()
return out_path
def _emit(line: str) -> None:
print(line, flush=True)
def _emit_scene(scene_idx: int, scene: Scene) -> None:
payload = {
"index": scene_idx,
"image_prompt": scene.image_prompt,
"video_motion": scene.video_motion,
"narration": scene.narration,
}
_emit("SCENE_JSON " + json.dumps(payload, ensure_ascii=False))
def _fallback_scenes(prompt: str) -> list[Scene]:
return [
Scene(
image_prompt=f"{prompt},城市夜景,霓虹灯,电影感",
video_motion="缓慢推进镜头,轻微摇镜",
narration="夜色温柔落在街灯上",
),
Scene(
image_prompt=f"{prompt},咖啡店窗边,暖光,细雨",
video_motion="侧向平移,人物轻轻抬头",
narration="雨声里藏着一段回忆",
),
Scene(
image_prompt=f"{prompt},桥上远景,车流光轨,温暖",
video_motion="拉远全景,光轨流动",
narration="我们在光里学会告别",
),
]
def _should_allow_llm_without_key(cfg: AppConfig) -> bool:
api_key_env = str(cfg.get("openai.api_key_env", "OPENAI_API_KEY"))
return bool(os.environ.get(api_key_env))
def _generate_scenes_for_run(prompt: str, cfg: AppConfig, mock: bool) -> list[Scene]:
if mock and not _should_allow_llm_without_key(cfg):
return _fallback_scenes(prompt)
try:
return generate_scenes(prompt, cfg)
except Exception:
if mock:
return _fallback_scenes(prompt)
raise
async def run_pipeline(prompt: str, cfg: AppConfig, mock: bool) -> Path:
scenes = _generate_scenes_for_run(prompt, cfg, mock=mock)
audios = await synthesize_scenes([s.narration for s in scenes], cfg)
segments: list[Segment] = []
fps = int(cfg.get("video.mock_fps", 24))
mock_size = cfg.get("video.mock_size", [1024, 576])
w, h = int(mock_size[0]), int(mock_size[1])
mock_image = _ensure_mock_image(Path("./assets/mock.png"), (w, h))
if mock:
for i, (scene, audio) in enumerate(zip(scenes, audios), start=1):
vpath = Path("./assets/mock_videos") / f"scene_{i:02d}.mp4"
_make_mock_video(vpath, mock_image, audio.duration_s, fps=fps)
segments.append(Segment(video_path=vpath, audio_path=audio.path, narration=scene.narration))
return render_final(segments, cfg)
comfy = ComfyClient(cfg)
wf = comfy.load_workflow()
for i, (scene, audio) in enumerate(zip(scenes, audios), start=1):
seed = random.randint(1, 2_147_483_647)
wf_i = comfy.inject_params(wf, image_prompt=scene.image_prompt, seed=seed, motion_prompt=scene.video_motion or None)
result = await comfy.run_workflow(wf_i)
# pick first mp4-like output; if none, fall back to first file.
candidates = [p for p in result.output_files if p.suffix.lower() in {".mp4", ".mov", ".webm"}]
video_path = candidates[0] if candidates else result.output_files[0]
segments.append(Segment(video_path=video_path, audio_path=audio.path, narration=scene.narration))
return render_final(segments, cfg)
def script_only(prompt: str, cfg: AppConfig, mock: bool) -> int:
scenes = _generate_scenes_for_run(prompt, cfg, mock=mock)
_emit("SCRIPT_BEGIN")
for idx, s in enumerate(scenes, start=1):
_emit_scene(idx, s)
_emit("SCRIPT_END")
return 0
def main() -> int:
parser = argparse.ArgumentParser(description="AIGC auto video generation POC")
parser.add_argument("--prompt", required=True, help="User creative prompt")
parser.add_argument("--config", default="./configs/config.yaml", help="Config yaml path")
parser.add_argument("--mock", action="store_true", help="Mock mode (no ComfyUI needed)")
parser.add_argument(
"--script-only",
action="store_true",
help="Only generate script/scenes and print to stdout (for Node.js streaming)",
)
args = parser.parse_args()
# Backward-compatible entry: delegate to engine/main.py
from engine.main import main as engine_main
cfg = AppConfig.load(args.config)
if args.script_only:
return script_only(args.prompt, cfg, mock=args.mock)
out = asyncio.run(run_pipeline(args.prompt, cfg, mock=args.mock))
print(str(out))
return 0
return engine_main()
if __name__ == "__main__":

View File

@@ -0,0 +1,18 @@
{
"task_id": "'06b0a90f-c964-4a88-8e80-6ff668e031b3'",
"status": "failed",
"shots": [
{
"shot_id": "scene_01_01",
"status": "running"
},
{
"shot_id": "scene_02_01",
"status": "pending"
},
{
"shot_id": "scene_03_01",
"status": "pending"
}
]
}

View File

@@ -0,0 +1,18 @@
{
"task_id": "'13c9b724-77e3-4553-aebf-dfc845dd17c1'",
"status": "done",
"shots": [
{
"shot_id": "scene_01_01",
"status": "done"
},
{
"shot_id": "scene_02_01",
"status": "done"
},
{
"shot_id": "scene_03_01",
"status": "done"
}
]
}

View File

@@ -0,0 +1,19 @@
{
"scenes": [
{
"image_prompt": "写一个温暖的城市夜景故事\n\n\n[Global Constraints]\n- Global Style: 电影感\n请严格遵守上述全局信息并保持三分镜主角一致。城市夜景霓虹灯电影感",
"video_motion": "缓慢推进镜头,轻微摇镜",
"narration": "夜色温柔落在街灯上"
},
{
"image_prompt": "写一个温暖的城市夜景故事\n\n\n[Global Constraints]\n- Global Style: 电影感\n请严格遵守上述全局信息并保持三分镜主角一致。咖啡店窗边暖光细雨",
"video_motion": "侧向平移,人物轻轻抬头",
"narration": "雨声里藏着一段回忆"
},
{
"image_prompt": "写一个温暖的城市夜景故事\n\n\n[Global Constraints]\n- Global Style: 电影感\n请严格遵守上述全局信息并保持三分镜主角一致。桥上远景车流光轨温暖",
"video_motion": "拉远全景,光轨流动",
"narration": "我们在光里学会告别"
}
]
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

View File

@@ -0,0 +1,19 @@
{
"scenes": [
{
"image_prompt": "写一个温暖的城市夜景故事,城市夜景,霓虹灯,电影感",
"video_motion": "缓慢推进镜头,轻微摇镜",
"narration": "夜色温柔落在街灯上"
},
{
"image_prompt": "写一个温暖的城市夜景故事,咖啡店窗边,暖光,细雨",
"video_motion": "侧向平移,人物轻轻抬头",
"narration": "雨声里藏着一段回忆"
},
{
"image_prompt": "写一个温暖的城市夜景故事,桥上远景,车流光轨,温暖",
"video_motion": "拉远全景,光轨流动",
"narration": "我们在光里学会告别"
}
]
}

View File

@@ -0,0 +1,19 @@
{
"scenes": [
{
"image_prompt": "写一个温暖的城市夜景故事\n\n\n[Global Constraints]\n- Global Style: 电影感\n请严格遵守上述全局信息并保持三分镜主角一致。城市夜景霓虹灯电影感",
"video_motion": "缓慢推进镜头,轻微摇镜",
"narration": "夜色温柔落在街灯上"
},
{
"image_prompt": "写一个温暖的城市夜景故事\n\n\n[Global Constraints]\n- Global Style: 电影感\n请严格遵守上述全局信息并保持三分镜主角一致。咖啡店窗边暖光细雨",
"video_motion": "侧向平移,人物轻轻抬头",
"narration": "雨声里藏着一段回忆"
},
{
"image_prompt": "写一个温暖的城市夜景故事\n\n\n[Global Constraints]\n- Global Style: 电影感\n请严格遵守上述全局信息并保持三分镜主角一致。桥上远景车流光轨温暖",
"video_motion": "拉远全景,光轨流动",
"narration": "我们在光里学会告别"
}
]
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

Binary file not shown.

View File

@@ -0,0 +1,19 @@
{
"scenes": [
{
"image_prompt": "写一个温暖的城市夜景故事,城市夜景,霓虹灯,电影感",
"video_motion": "缓慢推进镜头,轻微摇镜",
"narration": "夜色温柔落在街灯上"
},
{
"image_prompt": "写一个温暖的城市夜景故事,咖啡店窗边,暖光,细雨",
"video_motion": "侧向平移,人物轻轻抬头",
"narration": "雨声里藏着一段回忆"
},
{
"image_prompt": "写一个温暖的城市夜景故事,桥上远景,车流光轨,温暖",
"video_motion": "拉远全景,光轨流动",
"narration": "我们在光里学会告别"
}
]
}

View File

@@ -0,0 +1,18 @@
{
"task_id": "3ef0c0b8-c90f-49a8-88e4-e8ca735312f0",
"status": "done",
"shots": [
{
"shot_id": "scene_01_01",
"status": "done"
},
{
"shot_id": "scene_02_01",
"status": "done"
},
{
"shot_id": "scene_03_01",
"status": "done"
}
]
}

Binary file not shown.

View File

@@ -0,0 +1,10 @@
{
"task_id": "3f82b1ce-da18-4f82-9147-25eb0abeaf2c",
"status": "done",
"shots": [
{
"shot_id": "scene_01_01",
"status": "done"
}
]
}

View File

@@ -0,0 +1,19 @@
{
"scenes": [
{
"image_prompt": "写一个温暖的城市夜景故事\n\n\n[Global Constraints]\n- Global Style: 电影感\n请严格遵守上述全局信息并保持三分镜主角一致。城市夜景霓虹灯电影感",
"video_motion": "缓慢推进镜头,轻微摇镜",
"narration": "夜色温柔落在街灯上"
},
{
"image_prompt": "写一个温暖的城市夜景故事\n\n\n[Global Constraints]\n- Global Style: 电影感\n请严格遵守上述全局信息并保持三分镜主角一致。咖啡店窗边暖光细雨",
"video_motion": "侧向平移,人物轻轻抬头",
"narration": "雨声里藏着一段回忆"
},
{
"image_prompt": "写一个温暖的城市夜景故事\n\n\n[Global Constraints]\n- Global Style: 电影感\n请严格遵守上述全局信息并保持三分镜主角一致。桥上远景车流光轨温暖",
"video_motion": "拉远全景,光轨流动",
"narration": "我们在光里学会告别"
}
]
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

View File

@@ -0,0 +1,19 @@
{
"scenes": [
{
"image_prompt": "写一个温暖的城市夜景故事\n\n\n[Global Constraints]\n- Global Style: 电影感\n请严格遵守上述全局信息并保持三分镜主角一致。城市夜景霓虹灯电影感",
"video_motion": "缓慢推进镜头,轻微摇镜",
"narration": "夜色温柔落在街灯上"
},
{
"image_prompt": "写一个温暖的城市夜景故事\n\n\n[Global Constraints]\n- Global Style: 电影感\n请严格遵守上述全局信息并保持三分镜主角一致。咖啡店窗边暖光细雨",
"video_motion": "侧向平移,人物轻轻抬头",
"narration": "雨声里藏着一段回忆"
},
{
"image_prompt": "写一个温暖的城市夜景故事\n\n\n[Global Constraints]\n- Global Style: 电影感\n请严格遵守上述全局信息并保持三分镜主角一致。桥上远景车流光轨温暖",
"video_motion": "拉远全景,光轨流动",
"narration": "我们在光里学会告别"
}
]
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.1 MiB

Binary file not shown.

View File

@@ -0,0 +1,18 @@
{
"task_id": "62da5541-43d2-4ead-a243-e68345877dff",
"status": "done",
"shots": [
{
"shot_id": "scene_01_01",
"status": "done"
},
{
"shot_id": "scene_02_01",
"status": "done"
},
{
"shot_id": "scene_03_01",
"status": "done"
}
]
}

Binary file not shown.

View File

@@ -0,0 +1,19 @@
{
"scenes": [
{
"image_prompt": "写一个温暖的城市夜景故事\n\n\n[Global Constraints]\n- Global Style: 电影感\n请严格遵守上述全局信息并保持三分镜主角一致。城市夜景霓虹灯电影感",
"video_motion": "缓慢推进镜头,轻微摇镜",
"narration": "夜色温柔落在街灯上"
},
{
"image_prompt": "写一个温暖的城市夜景故事\n\n\n[Global Constraints]\n- Global Style: 电影感\n请严格遵守上述全局信息并保持三分镜主角一致。咖啡店窗边暖光细雨",
"video_motion": "侧向平移,人物轻轻抬头",
"narration": "雨声里藏着一段回忆"
},
{
"image_prompt": "写一个温暖的城市夜景故事\n\n\n[Global Constraints]\n- Global Style: 电影感\n请严格遵守上述全局信息并保持三分镜主角一致。桥上远景车流光轨温暖",
"video_motion": "拉远全景,光轨流动",
"narration": "我们在光里学会告别"
}
]
}

Some files were not shown because too many files have changed in this diff Show More