178 lines
5.3 KiB
Python
178 lines
5.3 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import os
|
|
import random
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import numpy as np
|
|
from moviepy import AudioFileClip, VideoClip
|
|
from PIL import Image
|
|
from urllib.request import urlopen
|
|
|
|
from .audio_gen import synthesize_one
|
|
from .comfy_client import generate_image as comfy_generate_image
|
|
from .config import AppConfig
|
|
from .render_pipeline import render_shot as render_shot_pipeline
|
|
|
|
|
|
ASSETS_DIR = "assets"
|
|
DEMO_IMAGE = os.path.join(ASSETS_DIR, "demo.jpg")
|
|
|
|
|
|
def ensure_demo_image() -> None:
|
|
os.makedirs(ASSETS_DIR, exist_ok=True)
|
|
if os.path.exists(DEMO_IMAGE):
|
|
return
|
|
|
|
# Simple placeholder image source.
|
|
url = "https://picsum.photos/1280/720"
|
|
with urlopen(url, timeout=30) as resp:
|
|
data = resp.read()
|
|
|
|
with open(DEMO_IMAGE, "wb") as f:
|
|
f.write(data)
|
|
|
|
|
|
def generate_image_mock(prompt: str) -> str:
|
|
# Keep interface compatible with the requested interface.
|
|
_ = prompt
|
|
ensure_demo_image()
|
|
return DEMO_IMAGE
|
|
|
|
|
|
def enrich_prompt(prompt_text: str) -> str:
|
|
style = "cinematic, ultra realistic, 4k, detailed lighting"
|
|
pt = (prompt_text or "").strip()
|
|
if not pt:
|
|
return style
|
|
return f"{pt}, {style}"
|
|
|
|
|
|
async def _render_shot_async(
|
|
shot: dict[str, Any],
|
|
output_dir: str | Path,
|
|
cfg: AppConfig,
|
|
*,
|
|
mock: bool = False,
|
|
) -> str:
|
|
out_dir = Path(output_dir)
|
|
out_dir.mkdir(parents=True, exist_ok=True)
|
|
clips_dir = out_dir / "clips"
|
|
audio_dir = out_dir / "audio"
|
|
clips_dir.mkdir(parents=True, exist_ok=True)
|
|
audio_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
shot_id = str(shot.get("shot_id", "unknown"))
|
|
image_prompt = str(shot.get("image_prompt", "")).strip()
|
|
prompt_text = str(shot.get("prompt", image_prompt) or image_prompt).strip()
|
|
tts_text = str(shot.get("tts", "")).strip()
|
|
duration_s = max(1.0, float(shot.get("duration", 3)))
|
|
|
|
voice = str(cfg.get("tts.voice", "zh-CN-XiaoxiaoNeural"))
|
|
rate = str(cfg.get("tts.rate", "+0%"))
|
|
volume = str(cfg.get("tts.volume", "+0%"))
|
|
audio_asset: Any | None = None
|
|
if tts_text:
|
|
audio_path = audio_dir / f"shot_{shot_id}.mp3"
|
|
audio_asset = await synthesize_one(tts_text, audio_path, voice, rate, volume)
|
|
|
|
# Use config-defined output resolution for stable concatenation.
|
|
mock_size = cfg.get("video.mock_size", [1024, 576])
|
|
w, h = int(mock_size[0]), int(mock_size[1])
|
|
fps = int(cfg.get("video.mock_fps", 24))
|
|
|
|
if audio_asset and audio_asset.duration_s:
|
|
duration_s = max(duration_s, float(audio_asset.duration_s))
|
|
|
|
# shot -> image (ComfyUI first; fallback to demo.jpg)
|
|
image_path: str
|
|
if mock:
|
|
image_path = generate_image_mock(prompt_text)
|
|
else:
|
|
try:
|
|
enriched = enrich_prompt(prompt_text)
|
|
# Store generated images directly under outputs/{task_id}
|
|
# (as required by verification: outputs/{task_id}/*.png).
|
|
image_path = str(
|
|
comfy_generate_image(
|
|
enriched,
|
|
out_dir,
|
|
cfg=cfg,
|
|
timeout_s=60,
|
|
retry=2,
|
|
filename_prefix=f"shot_{shot_id}",
|
|
)
|
|
)
|
|
print(f"[SHOT_RENDER] {shot_id} -> image generated: {image_path}")
|
|
except Exception as e:
|
|
print(f"[WARN] Comfy failed, fallback to demo: {e}")
|
|
image_path = generate_image_mock(prompt_text)
|
|
|
|
# Ensure image exists before rendering.
|
|
if not image_path or not os.path.exists(image_path):
|
|
image_path = generate_image_mock(prompt_text)
|
|
base_img = Image.open(image_path).convert("RGB")
|
|
|
|
def make_frame(t: float):
|
|
# Subtle zoom-in from 1.00 to ~1.03 over the clip duration.
|
|
progress = float(t) / max(duration_s, 1e-6)
|
|
progress = max(0.0, min(1.0, progress))
|
|
scale = 1.0 + 0.03 * progress
|
|
|
|
new_w = max(w, int(w * scale))
|
|
new_h = max(h, int(h * scale))
|
|
|
|
frame = base_img.resize((new_w, new_h), Image.LANCZOS)
|
|
left = (new_w - w) // 2
|
|
top = (new_h - h) // 2
|
|
frame = frame.crop((left, top, left + w, top + h))
|
|
return np.array(frame)
|
|
|
|
# image -> video
|
|
video = VideoClip(make_frame, duration=duration_s, has_constant_size=True)
|
|
|
|
# optional audio -> clip
|
|
audio_clip: AudioFileClip | None = None
|
|
if audio_asset and os.path.exists(str(audio_asset.path)):
|
|
audio_clip = AudioFileClip(str(audio_asset.path))
|
|
video = video.with_audio(audio_clip)
|
|
|
|
# output
|
|
clip_out = clips_dir / f"shot_{shot_id}.mp4"
|
|
print(f"[SHOT_RENDER] {shot_id} -> {clip_out}")
|
|
try:
|
|
video.write_videofile(
|
|
str(clip_out),
|
|
fps=fps,
|
|
codec="libx264",
|
|
audio_codec="aac",
|
|
preset="veryfast",
|
|
threads=2,
|
|
)
|
|
finally:
|
|
try:
|
|
video.close()
|
|
except Exception:
|
|
pass
|
|
if audio_clip is not None:
|
|
try:
|
|
audio_clip.close()
|
|
except Exception:
|
|
pass
|
|
|
|
return str(clip_out)
|
|
|
|
|
|
def render_shot(
|
|
shot: dict[str, Any],
|
|
output_dir: str | Path,
|
|
cfg: AppConfig | None = None,
|
|
*,
|
|
mock: bool = False,
|
|
) -> str:
|
|
cfg2 = cfg or AppConfig.load("./configs/config.yaml")
|
|
return render_shot_pipeline(shot, cfg2, output_dir, mock=mock)
|
|
|