import express from "express"; import { spawn } from "node:child_process"; import path from "node:path"; import { fileURLToPath } from "node:url"; import fs from "node:fs"; import crypto from "node:crypto"; const __filename = fileURLToPath(import.meta.url); const __dirname = path.dirname(__filename); const repoRoot = path.resolve(__dirname, ".."); const outputsDir = path.join(repoRoot, "outputs"); fs.mkdirSync(outputsDir, { recursive: true }); const app = express(); app.use(express.json({ limit: "2mb" })); app.use( "/api/static", express.static(outputsDir, { fallthrough: true, setHeaders: (res) => { // Important: avoid stale video preview. res.setHeader("Cache-Control", "no-cache, no-transform"); }, }) ); app.use("/assets", express.static(path.join(repoRoot, "assets"))); app.use(express.static(path.join(__dirname, "public"))); app.get("/api/health", (_req, res) => { res.setHeader("Cache-Control", "no-cache"); res.status(200).json({ ok: true }); }); app.get("/api/tasks/:task_id", (req, res) => { const taskId = String(req.params.task_id || "").trim(); if (!taskId) return res.status(400).json({ error: "missing task_id" }); const p = path.join(outputsDir, taskId, "task.json"); if (!fs.existsSync(p)) return res.status(404).json({ error: "task not found", task_id: taskId }); try { const raw = fs.readFileSync(p, "utf8"); const data = JSON.parse(raw); return res.json(data); } catch (e) { return res.status(500).json({ error: "failed to read task", task_id: taskId, detail: String(e) }); } }); function sseHeaders(res) { res.setHeader("Content-Type", "text/event-stream; charset=utf-8"); res.setHeader("Cache-Control", "no-cache, no-transform"); res.setHeader("Connection", "keep-alive"); // Nginx proxies may buffer unless disabled; harmless in dev. res.setHeader("X-Accel-Buffering", "no"); } function sseSend(res, event, data) { if (event) res.write(`event: ${event}\n`); const lines = String(data).split(/\r?\n/); for (const line of lines) res.write(`data: ${line}\n`); res.write("\n"); } function sseStageUpdate(res, payload) { // Unified schema for frontend stage rendering. const safe = { schema_version: 1, stage: String(payload && payload.stage ? payload.stage : "Unknown"), progress: payload && typeof payload.progress === "number" && Number.isFinite(payload.progress) ? Math.max(0, Math.min(1, payload.progress)) : null, scene_index: payload && Number.isFinite(payload.scene_index) ? Number(payload.scene_index) : null, scene_json: payload && payload.scene_json && typeof payload.scene_json === "object" ? payload.scene_json : null, shot_id: payload && payload.shot_id ? String(payload.shot_id) : null, shot_status: payload && payload.shot_status ? String(payload.shot_status) : null, message: payload && payload.message ? String(payload.message) : "", timestamp: Date.now(), }; sseSend(res, "stage_update", JSON.stringify(safe)); } function newTaskId() { // `crypto.randomUUID()` exists on newer Node versions; fall back for older runtimes. if (crypto && typeof crypto.randomUUID === "function") return crypto.randomUUID(); const bytes = crypto.randomBytes(16); // UUID v4: set version and variant bits. bytes[6] = (bytes[6] & 0x0f) | 0x40; bytes[8] = (bytes[8] & 0x3f) | 0x80; const hex = bytes.toString("hex"); return `${hex.slice(0, 8)}-${hex.slice(8, 12)}-${hex.slice(12, 16)}-${hex.slice(16, 20)}-${hex.slice(20)}`; } function taskDir(taskId) { return path.join(outputsDir, taskId); } function ensureTaskDir(taskId) { const dir = taskDir(taskId); fs.mkdirSync(dir, { recursive: true }); return dir; } function spawnPythonStep({ step, prompt, configPath, mock, globalStyle, character, taskId, sceneIndex, llmProvider, imageProvider, imageFallbackProvider, }) { const py = process.env.PYTHON_BIN || "python3.10"; const args = [ "-m", "engine.main", "--prompt", prompt, "--config", configPath, "--step", step, "--task-id", taskId, ]; if (sceneIndex) args.push("--scene-index", String(sceneIndex)); if (globalStyle) args.push("--global-style", globalStyle); if (character) args.push("--character", character); if (mock) args.push("--mock"); const childEnv = { ...process.env }; if (llmProvider) childEnv.ENGINE_LLM_PROVIDER = String(llmProvider).trim(); if (imageProvider) childEnv.ENGINE_IMAGE_PROVIDER = String(imageProvider).trim(); if (imageFallbackProvider) childEnv.ENGINE_IMAGE_FALLBACK_PROVIDER = String(imageFallbackProvider).trim(); return spawn(py, args, { cwd: repoRoot, env: childEnv, stdio: ["pipe", "pipe", "pipe"] }); } app.get("/api/script", (req, res) => { const prompt = String(req.query.prompt || "").trim(); const mock = String(req.query.mock || "1") === "1"; const globalStyle = String(req.query.global_style || "").trim(); const character = String(req.query.character || "").trim(); const configPath = String(req.query.config || "./configs/config.yaml"); const llmProvider = String(req.query.llm_provider || "").trim(); const imageProvider = String(req.query.image_provider || "").trim(); const imageFallbackProvider = String(req.query.image_fallback_provider || "").trim(); if (!prompt) { res.status(400).json({ error: "missing prompt" }); return; } const taskId = newTaskId(); ensureTaskDir(taskId); sseHeaders(res); sseSend(res, "task", JSON.stringify({ task_id: taskId })); sseSend(res, "status", "starting"); const child = spawnPythonStep({ step: "script", prompt, configPath, mock, globalStyle, character, taskId, llmProvider, imageProvider, imageFallbackProvider, }); let buf = ""; let sceneCount = 0; child.stdout.setEncoding("utf8"); child.stdout.on("data", (chunk) => { buf += chunk; const parts = buf.split(/\r?\n/); buf = parts.pop() || ""; for (const line of parts) { if (!line) continue; if (line.startsWith("SCENE_JSON ")) { try { const scene = JSON.parse(line.slice("SCENE_JSON ".length)); sceneCount += 1; sseStageUpdate(res, { stage: "Script", scene_index: Number(scene.index || sceneCount) - 1, scene_json: scene, progress: Math.min(0.9, sceneCount / 3), message: "scene_generated", }); } catch { sseSend(res, "line", line); } } else if (line.startsWith("PROG ")) { try { const p = JSON.parse(line.slice("PROG ".length)); sseStageUpdate(res, { stage: "Script", progress: Number(p.p || 0), message: p.msg || "" }); } catch { sseSend(res, "line", line); } } else { sseSend(res, "line", line); } } }); child.stderr.setEncoding("utf8"); child.stderr.on("data", (chunk) => { // stderr can contain non-fatal logs/warnings; keep as a normal line event. sseSend(res, "line", "[stderr] " + chunk); }); req.on("close", () => { child.kill("SIGTERM"); }); child.on("exit", (code) => { if (buf.trim()) sseSend(res, "line", buf.trim()); if (code !== 0) sseSend(res, "error", `[ERROR] python exit_code=${code}`); sseSend(res, "done", String(code != null ? code : 0)); res.end(); }); }); app.post("/api/refine", (req, res) => { const prompt = String((req.body && req.body.prompt) || "").trim(); const sceneIndex = Number((req.body && req.body.scene_index) || 1); const scenes = req.body && req.body.scenes; const scene = req.body && req.body.scene; const mock = Boolean((req.body && req.body.mock) != null ? req.body.mock : true); const globalStyle = String((req.body && req.body.global_style) || "").trim(); const character = String((req.body && req.body.character) || "").trim(); const configPath = String((req.body && req.body.config) || "./configs/config.yaml"); const llmProvider = String((req.body && req.body.llm_provider) || "").trim(); const imageProvider = String((req.body && req.body.image_provider) || "").trim(); const imageFallbackProvider = String((req.body && req.body.image_fallback_provider) || "").trim(); const taskId = String((req.body && req.body.task_id) || "").trim() || newTaskId(); if (!prompt) return res.status(400).json({ error: "missing prompt" }); if (!Number.isFinite(sceneIndex) || sceneIndex < 1) return res.status(400).json({ error: "bad scene_index" }); if (!Array.isArray(scenes) && (!scene || typeof scene !== "object")) { return res.status(400).json({ error: "missing scene or scenes[]" }); } ensureTaskDir(taskId); sseHeaders(res); sseSend(res, "task", JSON.stringify({ task_id: taskId })); sseStageUpdate(res, { stage: "Refine", progress: 0.05, message: "refine_start" }); const child = spawnPythonStep({ step: "refine", prompt, configPath, mock, globalStyle, character, taskId, sceneIndex, llmProvider, imageProvider, imageFallbackProvider, }); if (Array.isArray(scenes)) { child.stdin.end(JSON.stringify({ scenes })); } else { child.stdin.end(JSON.stringify({ scene })); } let out = ""; let err = ""; let buf = ""; child.stdout.setEncoding("utf8"); child.stderr.setEncoding("utf8"); child.stdout.on("data", (chunk) => { out += chunk; buf += chunk; const parts = buf.split(/\r?\n/); buf = parts.pop() || ""; for (const line of parts) { if (!line) continue; if (line.startsWith("SCENE_JSON ")) { try { const scenePayload = JSON.parse(line.slice("SCENE_JSON ".length)); sseStageUpdate(res, { stage: "Refine", progress: 1, scene_index: Number(scenePayload.index || sceneIndex) - 1, scene_json: scenePayload, message: "scene_refined", }); } catch { sseSend(res, "line", line); } } else if (line.startsWith("PROG ")) { try { const p = JSON.parse(line.slice("PROG ".length)); sseStageUpdate(res, { stage: "Refine", progress: Number(p.p || 0), message: p.msg || "" }); } catch { sseSend(res, "line", line); } } else { sseSend(res, "line", line); } } }); child.stderr.on("data", (c) => (err += c)); child.on("exit", (code) => { if (buf.trim()) sseSend(res, "line", buf.trim()); if (err.trim()) sseSend(res, "line", "[stderr] " + err.trim()); if (code !== 0) { sseStageUpdate(res, { stage: "Refine", progress: null, message: `refine_failed(exit=${code})` }); sseSend(res, "error", `[ERROR] python exit_code=${code}`); return res.end(); } sseStageUpdate(res, { stage: "Refine", progress: 1, message: "refine_done" }); sseSend(res, "done", JSON.stringify({ exit_code: code != null ? code : 0, task_id: taskId })); return res.end(); }); }); let isBusy = false; app.post("/api/render", (req, res) => { const prompt = String((req.body && req.body.prompt) || "").trim(); const scenes = req.body && req.body.scenes; const mock = Boolean((req.body && req.body.mock) != null ? req.body.mock : false); const globalStyle = String((req.body && req.body.global_style) || "").trim(); const character = String((req.body && req.body.character) || "").trim(); const configPath = String((req.body && req.body.config) || "./configs/config.yaml"); const llmProvider = String((req.body && req.body.llm_provider) || "").trim(); const imageProvider = String((req.body && req.body.image_provider) || "").trim(); const imageFallbackProvider = String((req.body && req.body.image_fallback_provider) || "").trim(); const taskId = String((req.body && req.body.task_id) || "").trim() || newTaskId(); if (!prompt) return res.status(400).json({ error: "missing prompt" }); if (!Array.isArray(scenes)) return res.status(400).json({ error: "missing scenes[]" }); ensureTaskDir(taskId); if (isBusy) { return res.status(429).json({ error: "busy", msg: "GPU is busy, try later" }); } isBusy = true; sseHeaders(res); sseSend(res, "task", JSON.stringify({ task_id: taskId })); sseSend(res, "status", "render_start"); const child = spawnPythonStep({ step: "render", prompt, configPath, mock, globalStyle, character, taskId, llmProvider, imageProvider, imageFallbackProvider, }); child.stdin.end(JSON.stringify({ scenes })); let buf = ""; child.stdout.setEncoding("utf8"); child.stderr.setEncoding("utf8"); child.stdout.on("data", (chunk) => { buf += chunk; const parts = buf.split(/\r?\n/); buf = parts.pop() || ""; for (const line of parts) { if (!line) continue; if (line.startsWith("PROG ")) { try { const p = JSON.parse(line.slice("PROG ".length)); sseStageUpdate(res, { stage: "Render", progress: Number(p.p || 0), message: p.msg || "" }); } catch { sseSend(res, "line", line); } } else if (line.startsWith("PROG_SHOT ")) { const rest = line.slice("PROG_SHOT ".length).trim(); const firstSpace = rest.indexOf(" "); if (firstSpace > 0) { const shotId = rest.slice(0, firstSpace).trim(); const status = rest.slice(firstSpace + 1).trim(); sseStageUpdate(res, { stage: "Render", progress: null, shot_id: shotId, shot_status: status, message: "shot_progress", }); } else { sseSend(res, "line", line); } } else if (line.startsWith("RENDER_DONE ")) sseSend(res, "done", line.slice("RENDER_DONE ".length)); else sseSend(res, "line", line); } }); child.stderr.on("data", (chunk) => { // stderr can contain non-fatal logs/warnings; keep as a normal line event. sseSend(res, "line", "[stderr] " + chunk); }); req.on("close", () => { child.kill("SIGTERM"); }); child.on("exit", (code) => { isBusy = false; if (buf.trim()) sseSend(res, "line", buf.trim()); if (code !== 0) sseSend(res, "error", `[ERROR] python exit_code=${code}`); res.end(); }); }); async function runSelfCheck() { const py = process.env.PYTHON_BIN || "python3.10"; const selfCheckTimeoutMs = Number(process.env.SELF_CHECK_TIMEOUT_MS || 300000); const workflowApiPath = path.join(repoRoot, "workflow_api.json"); const requireWorkflow = String(process.env.REQUIRE_WORKFLOW || "").trim() === "1"; const checks = [{ name: "check_comfy", args: ["scripts/check_comfy.py"] }]; if (fs.existsSync(workflowApiPath) || requireWorkflow) { checks.push({ name: "inspect_comfy_node", args: ["scripts/inspect_comfy_node.py"] }); } else { console.warn( `[server self-check] workflow_api.json not found at ${workflowApiPath}; skipping inspect_comfy_node (set REQUIRE_WORKFLOW=1 to enforce).` ); } for (const c of checks) { const deadline = Date.now() + selfCheckTimeoutMs; let lastErr = ""; while (Date.now() < deadline) { try { await new Promise((resolve, reject) => { const child = spawn(py, c.args, { cwd: repoRoot, env: process.env, stdio: ["ignore", "pipe", "pipe"] }); let out = ""; let err = ""; child.stdout.setEncoding("utf8"); child.stderr.setEncoding("utf8"); child.stdout.on("data", (d) => (out += d)); child.stderr.on("data", (d) => (err += d)); child.on("exit", (code) => { if (code === 0) return resolve(true); reject(new Error(`${c.name} failed (code=${code})\n${err || out}`)); }); }); lastErr = ""; break; } catch (e) { lastErr = String(e); await new Promise((r) => setTimeout(r, 2000)); } } if (lastErr) { throw new Error(lastErr); } } } const port = Number(process.env.PORT || 3000); (async () => { try { await runSelfCheck(); app.listen(port, () => { console.log(`[server] http://127.0.0.1:${port}`); }); } catch (e) { console.error(String(e)); process.exit(1); } })();