355 lines
13 KiB
Python
355 lines
13 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import asyncio
|
|
import json
|
|
import os
|
|
import random
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from moviepy import ImageClip
|
|
from PIL import Image, ImageDraw, ImageFont
|
|
|
|
from engine.audio_gen import synthesize_scenes
|
|
from engine.comfy_client import ComfyClient
|
|
from engine.config import AppConfig
|
|
from engine.script_gen import generate_scenes, refine_scene
|
|
from engine.types import Scene
|
|
from engine.video_editor import Segment, render_final
|
|
|
|
|
|
def _emit(line: str) -> None:
|
|
print(line, flush=True)
|
|
|
|
|
|
def _emit_scene(scene_idx: int, scene: Scene) -> None:
|
|
payload = {
|
|
"index": scene_idx,
|
|
"image_prompt": scene.image_prompt,
|
|
"video_motion": scene.video_motion,
|
|
"narration": scene.narration,
|
|
}
|
|
_emit("SCENE_JSON " + json.dumps(payload, ensure_ascii=False))
|
|
|
|
|
|
def _ensure_mock_image(path: Path, size: tuple[int, int]) -> Path:
|
|
if path.exists():
|
|
return path
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
img = Image.new("RGB", size, color=(20, 24, 33))
|
|
draw = ImageDraw.Draw(img)
|
|
text = "MOCK"
|
|
try:
|
|
font = ImageFont.load_default()
|
|
except Exception:
|
|
font = None
|
|
draw.text((size[0] // 2 - 30, size[1] // 2 - 10), text, fill=(240, 240, 240), font=font)
|
|
img.save(path)
|
|
return path
|
|
|
|
|
|
def _make_mock_video(out_path: Path, image_path: Path, duration_s: float, fps: int) -> Path:
|
|
out_path.parent.mkdir(parents=True, exist_ok=True)
|
|
clip = ImageClip(str(image_path)).with_duration(max(0.5, duration_s)).with_fps(fps)
|
|
try:
|
|
clip.write_videofile(str(out_path), codec="libx264", audio=False, fps=fps, preset="veryfast")
|
|
finally:
|
|
clip.close()
|
|
return out_path
|
|
|
|
|
|
def _prog(p: float, msg: str) -> None:
|
|
p2 = max(0.0, min(1.0, float(p)))
|
|
_emit("PROG " + json.dumps({"p": p2, "msg": msg}, ensure_ascii=False))
|
|
|
|
|
|
def _normalize_style(style: str | None) -> str:
|
|
s = (style or "").strip()
|
|
if not s:
|
|
return ""
|
|
# Allow both Chinese labels and simple aliases
|
|
mapping = {
|
|
"电影感": "电影感",
|
|
"cinema": "电影感",
|
|
"二次元": "二次元",
|
|
"anime": "二次元",
|
|
"写实": "写实",
|
|
"real": "写实",
|
|
}
|
|
return mapping.get(s, s)
|
|
|
|
|
|
def _inject_globals_into_prompt(prompt: str, *, style: str | None, character: str | None) -> str:
|
|
style_n = _normalize_style(style)
|
|
character_n = (character or "").strip()
|
|
if not style_n and not character_n:
|
|
return prompt
|
|
parts: list[str] = [prompt.strip(), "\n\n[Global Constraints]"]
|
|
if style_n:
|
|
parts.append(f"- Global Style: {style_n}")
|
|
if character_n:
|
|
parts.append(f"- Character Preset: {character_n}")
|
|
parts.append("请严格遵守上述全局信息,并保持三分镜主角一致。")
|
|
return "\n".join(parts).strip()
|
|
|
|
|
|
def _decorate_image_prompt(image_prompt: str, *, style: str | None, character: str | None) -> str:
|
|
# Industrial rule: final_prompt = f"{global_character}, {global_style}, {scene_prompt}"
|
|
style_n = _normalize_style(style)
|
|
character_n = (character or "").strip()
|
|
parts = []
|
|
if character_n:
|
|
parts.append(character_n)
|
|
if style_n:
|
|
parts.append(style_n)
|
|
parts.append(image_prompt)
|
|
return ", ".join([p for p in parts if p]).strip(", ")
|
|
|
|
|
|
def _fallback_scenes(prompt: str) -> list[Scene]:
|
|
return [
|
|
Scene(
|
|
image_prompt=f"{prompt},城市夜景,霓虹灯,电影感",
|
|
video_motion="缓慢推进镜头,轻微摇镜",
|
|
narration="夜色温柔落在街灯上",
|
|
),
|
|
Scene(
|
|
image_prompt=f"{prompt},咖啡店窗边,暖光,细雨",
|
|
video_motion="侧向平移,人物轻轻抬头",
|
|
narration="雨声里藏着一段回忆",
|
|
),
|
|
Scene(
|
|
image_prompt=f"{prompt},桥上远景,车流光轨,温暖",
|
|
video_motion="拉远全景,光轨流动",
|
|
narration="我们在光里学会告别",
|
|
),
|
|
]
|
|
|
|
|
|
def _has_llm_key(cfg: AppConfig) -> bool:
|
|
api_key_env = str(cfg.get("openai.api_key_env", "OPENAI_API_KEY"))
|
|
return bool(os.environ.get(api_key_env))
|
|
|
|
|
|
def _parse_scenes_from_obj(obj: Any) -> list[Scene]:
|
|
if not isinstance(obj, dict):
|
|
raise ValueError("payload must be object")
|
|
if "scene" in obj and obj.get("scene") is not None:
|
|
s = obj.get("scene")
|
|
if not isinstance(s, dict):
|
|
raise ValueError("payload.scene must be object")
|
|
return [
|
|
Scene(
|
|
image_prompt=str(s.get("image_prompt", "")).strip(),
|
|
video_motion=str(s.get("video_motion", "")).strip(),
|
|
narration=str(s.get("narration", "")).strip(),
|
|
)
|
|
]
|
|
scenes_raw = obj.get("scenes")
|
|
if not isinstance(scenes_raw, list) or not scenes_raw:
|
|
raise ValueError("payload.scenes must be non-empty array")
|
|
scenes: list[Scene] = []
|
|
for i, s in enumerate(scenes_raw, start=1):
|
|
if not isinstance(s, dict):
|
|
raise ValueError(f"scenes[{i}] must be object")
|
|
scenes.append(
|
|
Scene(
|
|
image_prompt=str(s.get("image_prompt", "")).strip(),
|
|
video_motion=str(s.get("video_motion", "")).strip(),
|
|
narration=str(s.get("narration", "")).strip(),
|
|
)
|
|
)
|
|
return scenes
|
|
|
|
|
|
async def _render_from_scenes(
|
|
prompt: str,
|
|
scenes: list[Scene],
|
|
cfg: AppConfig,
|
|
mock: bool,
|
|
*,
|
|
style: str | None,
|
|
character: str | None,
|
|
out_dir: Path,
|
|
) -> Path:
|
|
# Force-inject globals into image prompts for rendering.
|
|
scenes2 = [
|
|
Scene(
|
|
image_prompt=_decorate_image_prompt(s.image_prompt, style=style, character=character),
|
|
video_motion=s.video_motion,
|
|
narration=s.narration,
|
|
)
|
|
for s in scenes
|
|
]
|
|
|
|
_prog(0.15, "Generating TTS")
|
|
audios = await synthesize_scenes([s.narration for s in scenes2], cfg)
|
|
|
|
segments: list[Segment] = []
|
|
fps = int(cfg.get("video.mock_fps", 24))
|
|
mock_size = cfg.get("video.mock_size", [1024, 576])
|
|
w, h = int(mock_size[0]), int(mock_size[1])
|
|
mock_image = _ensure_mock_image(Path("./assets/mock.png"), (w, h))
|
|
|
|
if mock:
|
|
_prog(0.35, "Generating mock videos")
|
|
for i, (scene, audio) in enumerate(zip(scenes2, audios), start=1):
|
|
vpath = Path("./assets/mock_videos") / f"scene_{i:02d}.mp4"
|
|
_make_mock_video(vpath, mock_image, audio.duration_s, fps=fps)
|
|
segments.append(Segment(video_path=vpath, audio_path=audio.path, narration=scene.narration))
|
|
_prog(0.85, "Compositing final video")
|
|
out_path = out_dir / "final.mp4"
|
|
return render_final(segments, cfg, output_path=out_path)
|
|
|
|
comfy = ComfyClient(cfg)
|
|
wf = comfy.load_workflow()
|
|
for i, (scene, audio) in enumerate(zip(scenes2, audios), start=1):
|
|
_prog(0.25 + 0.45 * (i - 1) / max(1, len(scenes2)), f"Rendering scene {i} with ComfyUI")
|
|
seed = random.randint(1, 2_147_483_647)
|
|
wf_i = comfy.inject_params(wf, image_prompt=scene.image_prompt, seed=seed, motion_prompt=scene.video_motion or None)
|
|
result = await comfy.run_workflow(wf_i)
|
|
candidates = [p for p in result.output_files if p.suffix.lower() in {".mp4", ".mov", ".webm"}]
|
|
video_path = candidates[0] if candidates else result.output_files[0]
|
|
segments.append(Segment(video_path=video_path, audio_path=audio.path, narration=scene.narration))
|
|
_prog(0.85, "Compositing final video")
|
|
out_path = out_dir / "final.mp4"
|
|
return render_final(segments, cfg, output_path=out_path)
|
|
|
|
|
|
def _read_stdin_json() -> Any:
|
|
raw = sys.stdin.read()
|
|
if not raw.strip():
|
|
return None
|
|
return json.loads(raw)
|
|
|
|
|
|
def step_script(prompt: str, cfg: AppConfig, mock: bool, *, style: str | None, character: str | None, out_dir: Path) -> int:
|
|
prompt2 = _inject_globals_into_prompt(prompt, style=style, character=character)
|
|
if mock and not _has_llm_key(cfg):
|
|
# fallback scenes still should include global injection
|
|
scenes = _fallback_scenes(prompt)
|
|
else:
|
|
scenes = generate_scenes(prompt2, cfg)
|
|
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
_emit("SCRIPT_BEGIN")
|
|
for idx, s in enumerate(scenes, start=1):
|
|
s2 = Scene(
|
|
image_prompt=_decorate_image_prompt(s.image_prompt, style=style, character=character),
|
|
video_motion=s.video_motion,
|
|
narration=s.narration,
|
|
)
|
|
_emit_scene(idx, s2)
|
|
_emit("SCRIPT_END")
|
|
(out_dir / "scenes.json").write_text(
|
|
json.dumps(
|
|
{"scenes": [{"image_prompt": s.image_prompt, "video_motion": s.video_motion, "narration": s.narration} for s in scenes]},
|
|
ensure_ascii=False,
|
|
indent=2,
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
return 0
|
|
|
|
|
|
def step_refine(
|
|
prompt: str,
|
|
cfg: AppConfig,
|
|
mock: bool,
|
|
scene_index: int,
|
|
*,
|
|
style: str | None,
|
|
character: str | None,
|
|
out_dir: Path,
|
|
) -> int:
|
|
prompt2 = _inject_globals_into_prompt(prompt, style=style, character=character)
|
|
payload = _read_stdin_json()
|
|
scenes = _parse_scenes_from_obj(payload)
|
|
# If client only sent one scene, treat it as the target scene.
|
|
if len(scenes) == 1:
|
|
target_index = 1
|
|
else:
|
|
target_index = scene_index
|
|
if not (1 <= target_index <= len(scenes)):
|
|
raise ValueError("scene_index out of range")
|
|
|
|
if mock and not _has_llm_key(cfg):
|
|
# Simple fallback: append a tiny polish hint to narration
|
|
s = scenes[target_index - 1]
|
|
refined = Scene(
|
|
image_prompt=_decorate_image_prompt(s.image_prompt, style=style, character=character),
|
|
video_motion=s.video_motion,
|
|
narration=(s.narration + "(更凝练)")[:30],
|
|
)
|
|
else:
|
|
# Ensure globals are visible to LLM, and inject to output image prompt.
|
|
refined0 = refine_scene(prompt=prompt2, scenes=scenes, target_index=target_index, cfg=cfg)
|
|
refined = Scene(
|
|
image_prompt=_decorate_image_prompt(refined0.image_prompt, style=style, character=character),
|
|
video_motion=refined0.video_motion,
|
|
narration=refined0.narration,
|
|
)
|
|
|
|
# Keep the original index for frontend replacement.
|
|
_emit_scene(scene_index, refined)
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
(out_dir / f"refine_scene_{scene_index}.json").write_text(
|
|
json.dumps(
|
|
{"index": scene_index, "image_prompt": refined.image_prompt, "video_motion": refined.video_motion, "narration": refined.narration},
|
|
ensure_ascii=False,
|
|
indent=2,
|
|
),
|
|
encoding="utf-8",
|
|
)
|
|
return 0
|
|
|
|
|
|
def step_render(prompt: str, cfg: AppConfig, mock: bool, *, style: str | None, character: str | None, out_dir: Path) -> int:
|
|
payload = _read_stdin_json()
|
|
scenes = _parse_scenes_from_obj(payload)
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
_prog(0.05, "Start render")
|
|
out = asyncio.run(_render_from_scenes(prompt, scenes, cfg, mock=mock, style=style, character=character, out_dir=out_dir))
|
|
_prog(1.0, "Render finished")
|
|
_emit("RENDER_DONE " + json.dumps({"output": str(out)}, ensure_ascii=False))
|
|
return 0
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="AIGC interactive POC entry")
|
|
parser.add_argument("--prompt", required=True, help="User creative prompt")
|
|
parser.add_argument("--config", default="./configs/config.yaml", help="Config yaml path")
|
|
parser.add_argument("--mock", action="store_true", help="Mock mode (no ComfyUI needed)")
|
|
parser.add_argument("--step", default="script", choices=["script", "render", "refine"])
|
|
parser.add_argument("--scene-index", type=int, default=1, help="For --step=refine only (1-based)")
|
|
parser.add_argument("--global-style", default="", help="Global style lock (e.g. 电影感/二次元/写实)")
|
|
parser.add_argument("--character", default="", help="Character preset lock (main character description)")
|
|
parser.add_argument("--task-id", required=True, help="Task id (UUID). Outputs go to outputs/{task_id}/")
|
|
args = parser.parse_args()
|
|
|
|
cfg = AppConfig.load(args.config)
|
|
out_dir = Path("./outputs") / str(args.task_id)
|
|
|
|
if args.step == "script":
|
|
return step_script(args.prompt, cfg, mock=args.mock, style=args.global_style, character=args.character, out_dir=out_dir)
|
|
if args.step == "render":
|
|
return step_render(args.prompt, cfg, mock=args.mock, style=args.global_style, character=args.character, out_dir=out_dir)
|
|
if args.step == "refine":
|
|
return step_refine(
|
|
args.prompt,
|
|
cfg,
|
|
mock=args.mock,
|
|
scene_index=args.scene_index,
|
|
style=args.global_style,
|
|
character=args.character,
|
|
out_dir=out_dir,
|
|
)
|
|
raise SystemExit(2)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|
|
|