From 8d0b729f2f5304bec3244eb70488d6616ebf2b2c Mon Sep 17 00:00:00 2001 From: Daniel Date: Tue, 7 Apr 2026 00:37:39 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=96=B0=E5=A2=9E=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- read.md | 590 +++++++++++++++++++ video_worker/.env.example | 17 + video_worker/README.md | 194 ++++++ video_worker/app/__init__.py | 0 video_worker/app/api.py | 48 ++ video_worker/app/backends/__init__.py | 0 video_worker/app/backends/base.py | 19 + video_worker/app/backends/hunyuan_backend.py | 59 ++ video_worker/app/backends/ltx_backend.py | 56 ++ video_worker/app/gpu_worker.py | 93 +++ video_worker/app/main.py | 53 ++ video_worker/app/model_router.py | 15 + video_worker/app/schemas.py | 44 ++ video_worker/app/settings.py | 30 + video_worker/app/task_manager.py | 102 ++++ video_worker/app/task_store.py | 156 +++++ video_worker/app/utils/__init__.py | 0 video_worker/app/utils/ffmpeg_utils.py | 40 ++ video_worker/app/utils/files.py | 24 + video_worker/app/utils/image_utils.py | 12 + video_worker/app/utils/logger.py | 24 + video_worker/requirements.txt | 12 + video_worker/scripts/install_windows_env.ps1 | 31 + video_worker/scripts/install_wsl_env.sh | 31 + video_worker/scripts/migrate_db.py | 10 + video_worker/scripts/run_server.bat | 18 + video_worker/scripts/run_server.ps1 | 20 + video_worker/scripts/run_server.sh | 22 + video_worker/scripts/smoke_test.py | 48 ++ 29 files changed, 1768 insertions(+) create mode 100644 read.md create mode 100644 video_worker/.env.example create mode 100644 video_worker/README.md create mode 100644 video_worker/app/__init__.py create mode 100644 video_worker/app/api.py create mode 100644 video_worker/app/backends/__init__.py create mode 100644 video_worker/app/backends/base.py create mode 100644 video_worker/app/backends/hunyuan_backend.py create mode 100644 video_worker/app/backends/ltx_backend.py create mode 100644 video_worker/app/gpu_worker.py create mode 100644 video_worker/app/main.py create mode 100644 video_worker/app/model_router.py create mode 100644 video_worker/app/schemas.py create mode 100644 video_worker/app/settings.py create mode 100644 video_worker/app/task_manager.py create mode 100644 video_worker/app/task_store.py create mode 100644 video_worker/app/utils/__init__.py create mode 100644 video_worker/app/utils/ffmpeg_utils.py create mode 100644 video_worker/app/utils/files.py create mode 100644 video_worker/app/utils/image_utils.py create mode 100644 video_worker/app/utils/logger.py create mode 100644 video_worker/requirements.txt create mode 100644 video_worker/scripts/install_windows_env.ps1 create mode 100755 video_worker/scripts/install_wsl_env.sh create mode 100644 video_worker/scripts/migrate_db.py create mode 100644 video_worker/scripts/run_server.bat create mode 100644 video_worker/scripts/run_server.ps1 create mode 100755 video_worker/scripts/run_server.sh create mode 100644 video_worker/scripts/smoke_test.py diff --git a/read.md b/read.md new file mode 100644 index 0000000..78a9f42 --- /dev/null +++ b/read.md @@ -0,0 +1,590 @@ +你是一个资深 Python / AI Infra / 推理服务工程师。你的任务不是讨论方案,而是**直接在本地完成一个可运行的项目**。 + +# 任务目标 + +在 **WSL2 + Ubuntu + NVIDIA RTX A4000 16GB** 环境下,开发一个 **本地单机视频生成 Worker 服务**,只做一件事: + +> 接收一次视频生成请求 → 根据模式选择模型 → 在 A4000 上执行推理 → 输出视频文件 → 返回任务状态与结果路径。 + +这个 Worker 是一个**边缘执行节点**,不是完整平台。 + +--- + +# 必须遵守的边界 + +## 只做这些 + +* 提供本地 HTTP API +* 接收 text-to-video 请求 +* 使用本地模型推理 +* 单任务串行执行 +* 输出固定目录结构 +* 支持两种模式: + + * `preview` → `LTX-Video` + * `refine` → `HunyuanVideo-1.5` + +## 不要做这些 + +* 不做脚本生成 +* 不做分镜拆解 +* 不做 prompt 自动生成 +* 不做剪辑 +* 不做 ComfyUI 集成 +* 不做多机调度 +* 不做前端页面 +* 不做复杂鉴权 +* 不做 image-to-video +* 不做 video extend +* 不做数据库集群 +* 不做消息队列中间件 + +--- + +# 最终交付物 + +你必须直接生成一个完整可运行项目,至少包含: + +1. 项目源码目录 +2. `requirements.txt` +3. `.env.example` +4. `README.md` +5. `scripts/install_wsl_env.sh` +6. `scripts/run_server.sh` +7. `scripts/smoke_test.py` +8. FastAPI 服务源码 +9. SQLite 任务存储 +10. 单 worker 串行队列 +11. 模型路由器 +12. `LTX` backend +13. `Hunyuan` backend +14. 统一输出目录 +15. 基础日志 +16. 错误处理 +17. 健康检查接口 + +--- + +# 技术要求 + +## 基础技术栈 + +* Python 3.10+ +* FastAPI +* Uvicorn +* Pydantic +* SQLite +* asyncio +* ffmpeg +* torch +* diffusers / transformers / accelerate / safetensors(按需要引入) +* WSL2 下通过 CUDA 调用 A4000 + +## 代码要求 + +* 代码必须结构清晰 +* 所有核心模块必须可直接运行 +* 不要只写伪代码 +* 必须包含真实可执行代码骨架 +* 重要处要有注释 +* 要考虑异常处理 +* 要考虑模型懒加载 +* 要考虑单卡显存限制 +* 所有路径都要可配置 +* 配置统一放到 `settings.py` 和 `.env` + +--- + +# 系统架构 + +实现下面这个最小架构: + +```text +Client + -> FastAPI + -> TaskManager + -> ModelRouter + -> GPUWorker + -> Backend(LTX/Hunyuan) + -> OutputWriter + -> outputs/{task_id}/video.mp4 +``` + +--- + +# 模型路由规则 + +固定规则: + +* `quality_mode = "preview"` → `LTX-Video` +* `quality_mode = "refine"` → `HunyuanVideo-1.5` + +请把模型路由实现成独立模块,后续方便替换。 + +--- + +# 任务状态 + +只保留这些状态: + +* `PENDING` +* `RUNNING` +* `SUCCEEDED` +* `FAILED` + +不要额外扩展复杂状态机。 + +--- + +# API 设计 + +你必须实现以下接口。 + +## 1)创建任务 + +### `POST /generate` + +请求体: + +```json +{ + "prompt": "a lonely man walking in a rainy neon street, cinematic, handheld camera", + "negative_prompt": "blurry, deformed face, extra limbs, flicker", + "quality_mode": "preview", + "duration_sec": 5, + "width": 832, + "height": 480, + "fps": 16, + "steps": 8, + "seed": 123456 +} +``` + +要求: + +* 自动生成 `task_id` +* 写入 SQLite +* 入队 +* 返回 `task_id` 和状态 + +--- + +## 2)查询任务状态 + +### `GET /tasks/{task_id}` + +返回: + +* `task_id` +* `status` +* `backend` +* `model_name` +* `progress` +* `created_at` +* `updated_at` + +--- + +## 3)查询任务结果 + +### `GET /tasks/{task_id}/result` + +成功时返回: + +* `task_id` +* `status` +* `video_path` +* `first_frame_path` +* `metadata_path` +* `log_path` + +失败时返回: + +* `task_id` +* `status` +* `error` + +--- + +## 4)健康检查 + +### `GET /health` + +返回: + +* 服务状态 +* CUDA 是否可用 +* GPU 名称 +* 两个模型是否已加载 + +--- + +# 输入参数约束 + +第一版严格限制: + +* 只支持 text-to-video +* `duration_sec` 允许 1~5 +* `width` 最大 832 +* `height` 最大 480 +* `fps` 最大 24 +* `quality_mode` 只允许 `preview` 或 `refine` + +你需要在 Pydantic schema 中严格校验。 + +--- + +# 输出目录规范 + +每个任务固定输出到: + +```text +outputs/{task_id}/ + ├─ video.mp4 + ├─ first_frame.jpg + ├─ metadata.json + └─ run.log +``` + +其中: + +## `metadata.json` + +至少包含: + +* task_id +* backend +* model_name +* prompt +* negative_prompt +* seed +* width +* height +* fps +* steps +* duration_sec +* status +* created_at +* started_at +* finished_at +* video_path + +--- + +# 目录结构要求 + +按下面结构生成项目: + +```text +video_worker/ +├─ app/ +│ ├─ main.py +│ ├─ api.py +│ ├─ schemas.py +│ ├─ settings.py +│ ├─ task_manager.py +│ ├─ model_router.py +│ ├─ gpu_worker.py +│ ├─ task_store.py +│ ├─ backends/ +│ │ ├─ base.py +│ │ ├─ ltx_backend.py +│ │ └─ hunyuan_backend.py +│ └─ utils/ +│ ├─ files.py +│ ├─ ffmpeg_utils.py +│ ├─ image_utils.py +│ └─ logger.py +├─ models/ +│ ├─ ltx/ +│ └─ hunyuan/ +├─ outputs/ +├─ runtime/ +│ ├─ tasks.db +│ └─ logs/ +├─ scripts/ +│ ├─ install_wsl_env.sh +│ ├─ run_server.sh +│ └─ smoke_test.py +├─ requirements.txt +├─ .env.example +└─ README.md +``` + +--- + +# 模块职责 + +## `schemas.py` + +定义: + +* `GenerateRequest` +* `TaskStatusResponse` +* `TaskResultResponse` +* `HealthResponse` + +--- + +## `task_store.py` + +使用 SQLite 存储任务信息,至少包含: + +* `task_id` +* `status` +* `backend` +* `model_name` +* `request_json` +* `output_dir` +* `error_message` +* `created_at` +* `updated_at` + +--- + +## `task_manager.py` + +负责: + +* 创建任务 +* 写入数据库 +* 入队 +* 更新状态 +* 查询状态与结果 + +--- + +## `gpu_worker.py` + +负责: + +* 后台单线程取任务 +* 标记任务为 RUNNING +* 调用路由器选择 backend +* 执行生成 +* 写入结果 +* 成功标记 SUCCEEDED +* 失败标记 FAILED + +要求: + +* 必须是**单任务串行** +* 不允许多任务并发占用 GPU + +--- + +## `model_router.py` + +根据 `quality_mode` 返回 `ltx_backend` 或 `hunyuan_backend` + +--- + +## `backends/base.py` + +定义统一接口: + +* `load()` +* `is_loaded()` +* `generate(task)` + +--- + +## `backends/ltx_backend.py` + +要求: + +* 实现懒加载 +* 作为默认 preview 模型 +* 重点保证代码结构正确 +* 若本地真实模型推理接入较复杂,可先封装清晰的推理入口与参数映射,但不要省略真实调用预留位 +* 输出必须符合统一目录规范 + +--- + +## `backends/hunyuan_backend.py` + +要求: + +* 实现懒加载 +* 作为 refine 模型 +* 保留 `cpu offload`、`vae tiling` 等内存优化入口 +* 输出必须符合统一目录规范 + +--- + +## `utils/ffmpeg_utils.py` + +必须实现: + +* 如果 backend 输出帧序列,则合成为 `video.mp4` +* 从 `video.mp4` 抽取 `first_frame.jpg` + +--- + +# 运行原则 + +## 1)模型懒加载 + +* 服务启动时不要强制加载所有模型 +* 第一次调用对应 backend 时再加载 +* 加载后常驻 + +## 2)单任务串行 + +* A4000 16GB 只允许一个视频任务同时运行 +* 必须实现内存队列 + 单 worker + +## 3)先支持 preview,再支持 refine + +* 但代码结构里两个 backend 都要存在 +* 如果某个 backend 先以占位实现,也必须说明后续替换点 + +--- + +# 配置要求 + +在 `.env.example` 中提供以下配置项: + +```env +APP_HOST=0.0.0.0 +APP_PORT=8000 +OUTPUT_DIR=./outputs +RUNTIME_DIR=./runtime +SQLITE_PATH=./runtime/tasks.db + +LTX_MODEL_DIR=./models/ltx +HUNYUAN_MODEL_DIR=./models/hunyuan + +DEFAULT_WIDTH=832 +DEFAULT_HEIGHT=480 +DEFAULT_FPS=16 +DEFAULT_DURATION=5 +DEFAULT_STEPS_PREVIEW=8 +DEFAULT_STEPS_REFINE=12 +``` + +--- + +# README 必须包含 + +1. 项目说明 +2. 环境准备 +3. WSL + CUDA 检查方法 +4. 安装命令 +5. 启动命令 +6. 调用示例 +7. 目录说明 +8. API 说明 +9. 常见问题 +10. 已知限制 + +--- + +# 安装脚本要求 + +## `scripts/install_wsl_env.sh` + +需要完成: + +* 创建 Python venv +* 升级 pip +* 安装 requirements +* 安装 ffmpeg +* 创建输出目录 +* 创建 runtime 目录 +* 给出后续启动提示 + +--- + +# 启动脚本要求 + +## `scripts/run_server.sh` + +需要完成: + +* 激活 venv +* 检查 `.env` +* 启动 uvicorn + +--- + +# 烟雾测试要求 + +## `scripts/smoke_test.py` + +需要完成: + +* 调用 `/health` +* 创建一个 `preview` 任务 +* 轮询任务状态 +* 打印结果路径 + +--- + +# 开发顺序 + +严格按这个顺序实现: + +## Phase 1 + +* 建目录 +* 写 `settings.py` +* 写 `schemas.py` +* 写 `task_store.py` +* 写 `task_manager.py` +* 写基础 API +* 写 SQLite 初始化 +* 写单 worker 队列 + +## Phase 2 + +* 写 `model_router.py` +* 写 `backends/base.py` +* 写 `ltx_backend.py` +* 打通 preview 路由 +* 输出 `video.mp4 / metadata.json / run.log` + +## Phase 3 + +* 写 `hunyuan_backend.py` +* 打通 refine 路由 +* 增加 `/health` +* 增加 ffmpeg 抽帧 + +## Phase 4 + +* 完善 README +* 完善脚本 +* 完善错误处理 +* 完善 smoke test + +--- + +# 验收标准 + +你完成后,必须满足以下条件: + +1. 项目可直接启动 +2. `GET /health` 可用 +3. `POST /generate` 可创建任务 +4. 任务会进入 `PENDING -> RUNNING -> SUCCEEDED/FAILED` +5. preview 模式能真正走 `LTX backend` +6. refine 模式能真正走 `Hunyuan backend` +7. 输出目录结构正确 +8. `metadata.json` 完整 +9. 失败时有明确错误信息 +10. 代码结构足够清晰,后续方便替换真实模型实现 + +--- + +# 输出要求 + +你现在直接开始产出代码与文件内容,不要继续解释方案,不要重复需求,不要空谈架构。 + +请按下面顺序输出: + +1. 项目目录树 +2. 每个文件的完整内容 +3. 最后给出运行步骤 + +要求所有内容都尽可能完整,可复制使用。 diff --git a/video_worker/.env.example b/video_worker/.env.example new file mode 100644 index 0000000..a015d42 --- /dev/null +++ b/video_worker/.env.example @@ -0,0 +1,17 @@ +APP_HOST=0.0.0.0 +APP_PORT=8000 +OUTPUT_DIR=./outputs +RUNTIME_DIR=./runtime +SQLITE_PATH=./runtime/tasks.db + +LTX_MODEL_DIR=./models/ltx +HUNYUAN_MODEL_DIR=./models/hunyuan + +DEFAULT_WIDTH=832 +DEFAULT_HEIGHT=480 +DEFAULT_FPS=16 +DEFAULT_DURATION=5 +DEFAULT_STEPS_PREVIEW=8 +DEFAULT_STEPS_REFINE=12 + +LOG_LEVEL=INFO diff --git a/video_worker/README.md b/video_worker/README.md new file mode 100644 index 0000000..ba1ab26 --- /dev/null +++ b/video_worker/README.md @@ -0,0 +1,194 @@ +# Local Video Worker + +一个本地单机视频生成 Worker,提供最小化 HTTP API:接收任务、按模式路由模型、单任务串行执行、输出统一结果目录。 + +## 1. 项目说明 + +- 目标:边缘执行节点,不是完整平台。 +- 路由规则: + - `preview` -> `LTX-Video` + - `refine` -> `HunyuanVideo-1.5` +- 状态机:`PENDING` / `RUNNING` / `SUCCEEDED` / `FAILED` +- 当前后端是可执行骨架: + - 已实现懒加载、参数透传、输出规范、日志与错误处理 + - 真实模型推理请替换 `app/backends/ltx_backend.py` 与 `app/backends/hunyuan_backend.py` 中 `TODO` 位置 + +## 2. 环境准备 + +- Python 3.10+ +- ffmpeg +- NVIDIA GPU + CUDA(可选,健康检查会显示可用性) + +## 3. WSL + CUDA 检查方法 + +在 WSL Ubuntu 内执行: + +```bash +nvidia-smi +python -c "import torch; print(torch.cuda.is_available()); print(torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'no gpu')" +``` + +## 4. 安装命令 + +### WSL / Linux + +```bash +cd video_worker +bash scripts/install_wsl_env.sh +cp .env.example .env # 若脚本未自动生成 +``` + +### Windows PowerShell + +```powershell +cd video_worker +.\scripts\install_windows_env.ps1 +``` + +## 5. 启动命令 + +### WSL / Linux + +```bash +cd video_worker +bash scripts/run_server.sh +``` + +### Windows + +```powershell +cd video_worker +.\scripts\run_server.ps1 +``` + +或: + +```bat +scripts\run_server.bat +``` + +## 6. 调用示例 + +创建任务: + +```bash +curl -X POST http://127.0.0.1:8000/generate \ + -H "Content-Type: application/json" \ + -d '{ + "prompt": "a lonely man walking in a rainy neon street, cinematic, handheld camera", + "negative_prompt": "blurry, deformed face, extra limbs, flicker", + "quality_mode": "preview", + "duration_sec": 5, + "width": 832, + "height": 480, + "fps": 16, + "steps": 8, + "seed": 123456 + }' +``` + +轮询状态: + +```bash +curl http://127.0.0.1:8000/tasks/ +curl http://127.0.0.1:8000/tasks//result +``` + +烟雾测试: + +```bash +cd video_worker +. .venv/bin/activate # Windows: .\.venv\Scripts\Activate.ps1 +python scripts/smoke_test.py +``` + +## 7. 目录说明 + +```text +video_worker/ +├─ app/ +│ ├─ main.py +│ ├─ api.py +│ ├─ schemas.py +│ ├─ settings.py +│ ├─ task_manager.py +│ ├─ model_router.py +│ ├─ gpu_worker.py +│ ├─ task_store.py +│ ├─ backends/ +│ │ ├─ base.py +│ │ ├─ ltx_backend.py +│ │ └─ hunyuan_backend.py +│ └─ utils/ +│ ├─ files.py +│ ├─ ffmpeg_utils.py +│ ├─ image_utils.py +│ └─ logger.py +├─ models/ +│ ├─ ltx/ +│ └─ hunyuan/ +├─ outputs/ +├─ runtime/ +│ ├─ tasks.db +│ └─ logs/ +├─ scripts/ +│ ├─ install_wsl_env.sh +│ ├─ install_windows_env.ps1 +│ ├─ run_server.sh +│ ├─ run_server.ps1 +│ ├─ run_server.bat +│ ├─ migrate_db.py +│ └─ smoke_test.py +├─ requirements.txt +├─ .env.example +└─ README.md +``` + +## 8. API 说明 + +- `POST /generate` + - 创建任务并入队 +- `GET /tasks/{task_id}` + - 查询任务状态 +- `GET /tasks/{task_id}/result` + - 查询结果路径或错误 +- `GET /health` + - 服务状态、CUDA、GPU 名称、模型加载状态 + +参数限制: + +- `duration_sec`: 1~5 +- `width`: <= 832 +- `height`: <= 480 +- `fps`: <= 24 +- `quality_mode`: `preview` 或 `refine` + +## 9. 常见问题 + +- `ffmpeg not found` + - WSL: `sudo apt-get install -y ffmpeg` + - Windows: 安装 ffmpeg 并加入 PATH +- `torch.cuda.is_available() == False` + - 检查驱动、CUDA、WSL GPU 直通是否正常 +- 任务失败 + - 查看 `outputs/{task_id}/run.log` + - 查看 `/tasks/{task_id}/result` 返回的 `error` + +## 10. 已知限制 + +- 当前后端默认输出演示视频(可执行骨架),未内置完整真实模型权重加载 +- 单进程单 worker 串行执行,不支持多卡并行 +- SQLite 用于单机场景 + +## 迁移支持(数据库) + +项目内置 schema version 迁移: + +- 启动服务时自动执行迁移 +- 也可手动执行: + +```bash +python scripts/migrate_db.py +``` + +迁移记录存储在 `schema_migrations` 表,便于后续版本升级与跨环境迁移。 diff --git a/video_worker/app/__init__.py b/video_worker/app/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/video_worker/app/api.py b/video_worker/app/api.py new file mode 100644 index 0000000..b6fa713 --- /dev/null +++ b/video_worker/app/api.py @@ -0,0 +1,48 @@ +from fastapi import APIRouter, HTTPException + +from app.schemas import GenerateRequest, HealthResponse, TaskResultResponse, TaskStatusResponse + + +router = APIRouter() + + +@router.post("/generate", response_model=TaskStatusResponse) +async def create_generate_task(req: GenerateRequest): + task_manager = router.task_manager + return await task_manager.create_task(req) + + +@router.get("/tasks/{task_id}", response_model=TaskStatusResponse) +def get_task_status(task_id: str): + task_manager = router.task_manager + try: + return task_manager.get_status(task_id) + except KeyError as exc: + raise HTTPException(status_code=404, detail=str(exc)) from exc + + +@router.get("/tasks/{task_id}/result", response_model=TaskResultResponse) +def get_task_result(task_id: str): + task_manager = router.task_manager + try: + return task_manager.get_result(task_id) + except KeyError as exc: + raise HTTPException(status_code=404, detail=str(exc)) from exc + + +@router.get("/health", response_model=HealthResponse) +def health_check(): + torch = router.torch + ltx_backend = router.ltx_backend + hunyuan_backend = router.hunyuan_backend + + cuda_ok = bool(torch.cuda.is_available()) + gpu_name = torch.cuda.get_device_name(0) if cuda_ok else None + + return HealthResponse( + service_status="ok", + cuda_available=cuda_ok, + gpu_name=gpu_name, + ltx_loaded=ltx_backend.is_loaded(), + hunyuan_loaded=hunyuan_backend.is_loaded(), + ) diff --git a/video_worker/app/backends/__init__.py b/video_worker/app/backends/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/video_worker/app/backends/base.py b/video_worker/app/backends/base.py new file mode 100644 index 0000000..f958760 --- /dev/null +++ b/video_worker/app/backends/base.py @@ -0,0 +1,19 @@ +from abc import ABC, abstractmethod +from typing import Any, Dict + + +class BaseVideoBackend(ABC): + backend_name: str + model_name: str + + @abstractmethod + def load(self) -> None: + raise NotImplementedError + + @abstractmethod + def is_loaded(self) -> bool: + raise NotImplementedError + + @abstractmethod + def generate(self, task_id: str, request_data: Dict[str, Any], output_dir: str) -> Dict[str, str]: + raise NotImplementedError diff --git a/video_worker/app/backends/hunyuan_backend.py b/video_worker/app/backends/hunyuan_backend.py new file mode 100644 index 0000000..d16bb78 --- /dev/null +++ b/video_worker/app/backends/hunyuan_backend.py @@ -0,0 +1,59 @@ +from pathlib import Path +from typing import Any, Dict + +from app.backends.base import BaseVideoBackend +from app.utils.ffmpeg_utils import extract_first_frame, frames_to_video +from app.utils.files import TASK_FIRST_FRAME_NAME, TASK_VIDEO_NAME +from app.utils.image_utils import make_dummy_frame + + +class HunyuanBackend(BaseVideoBackend): + backend_name = "hunyuan_backend" + model_name = "HunyuanVideo-1.5" + + def __init__(self, model_dir: Path, enable_cpu_offload: bool = True, enable_vae_tiling: bool = True): + self.model_dir = model_dir + self.enable_cpu_offload = enable_cpu_offload + self.enable_vae_tiling = enable_vae_tiling + self._loaded = False + self._pipeline = None + + def load(self) -> None: + if self._loaded: + return + # TODO: Replace with real HunyuanVideo loading and memory optimization hooks. + # Example hooks: self._pipeline.enable_model_cpu_offload(), self._pipeline.vae.enable_tiling() + self.model_dir.mkdir(parents=True, exist_ok=True) + self._pipeline = "hunyuan_pipeline_placeholder" + self._loaded = True + + def is_loaded(self) -> bool: + return self._loaded + + def generate(self, task_id: str, request_data: Dict[str, Any], output_dir: str) -> Dict[str, str]: + self.load() + output = Path(output_dir) + frames_dir = output / "frames" + frames_dir.mkdir(parents=True, exist_ok=True) + + duration = int(request_data["duration_sec"]) + fps = int(request_data["fps"]) + width = int(request_data["width"]) + height = int(request_data["height"]) + prompt = request_data["prompt"] + + total_frames = duration * fps + for i in range(total_frames): + frame_path = frames_dir / f"frame_{i:04d}.jpg" + make_dummy_frame(frame_path, width, height, f"Hunyuan refine | {prompt[:60]}", i) + + video_path = output / TASK_VIDEO_NAME + frames_to_video(str(frames_dir / "frame_%04d.jpg"), fps, video_path) + + first_frame_path = output / TASK_FIRST_FRAME_NAME + extract_first_frame(video_path, first_frame_path) + + return { + "video_path": str(video_path.resolve()), + "first_frame_path": str(first_frame_path.resolve()), + } diff --git a/video_worker/app/backends/ltx_backend.py b/video_worker/app/backends/ltx_backend.py new file mode 100644 index 0000000..573e985 --- /dev/null +++ b/video_worker/app/backends/ltx_backend.py @@ -0,0 +1,56 @@ +from pathlib import Path +from typing import Any, Dict + +from app.backends.base import BaseVideoBackend +from app.utils.ffmpeg_utils import extract_first_frame, frames_to_video +from app.utils.files import TASK_FIRST_FRAME_NAME, TASK_VIDEO_NAME +from app.utils.image_utils import make_dummy_frame + + +class LTXBackend(BaseVideoBackend): + backend_name = "ltx_backend" + model_name = "LTX-Video" + + def __init__(self, model_dir: Path): + self.model_dir = model_dir + self._loaded = False + self._pipeline = None + + def load(self) -> None: + if self._loaded: + return + # TODO: Replace with real LTX loading, e.g. DiffusionPipeline.from_pretrained(...) + self.model_dir.mkdir(parents=True, exist_ok=True) + self._pipeline = "ltx_pipeline_placeholder" + self._loaded = True + + def is_loaded(self) -> bool: + return self._loaded + + def generate(self, task_id: str, request_data: Dict[str, Any], output_dir: str) -> Dict[str, str]: + self.load() + output = Path(output_dir) + frames_dir = output / "frames" + frames_dir.mkdir(parents=True, exist_ok=True) + + duration = int(request_data["duration_sec"]) + fps = int(request_data["fps"]) + width = int(request_data["width"]) + height = int(request_data["height"]) + prompt = request_data["prompt"] + + total_frames = duration * fps + for i in range(total_frames): + frame_path = frames_dir / f"frame_{i:04d}.jpg" + make_dummy_frame(frame_path, width, height, f"LTX preview | {prompt[:60]}", i) + + video_path = output / TASK_VIDEO_NAME + frames_to_video(str(frames_dir / "frame_%04d.jpg"), fps, video_path) + + first_frame_path = output / TASK_FIRST_FRAME_NAME + extract_first_frame(video_path, first_frame_path) + + return { + "video_path": str(video_path.resolve()), + "first_frame_path": str(first_frame_path.resolve()), + } diff --git a/video_worker/app/gpu_worker.py b/video_worker/app/gpu_worker.py new file mode 100644 index 0000000..14abc37 --- /dev/null +++ b/video_worker/app/gpu_worker.py @@ -0,0 +1,93 @@ +import asyncio +import json +from datetime import datetime, timezone +from pathlib import Path + +from app.model_router import ModelRouter +from app.task_manager import TaskManager +from app.utils.files import write_json +from app.utils.logger import build_logger + + +class GPUWorker: + def __init__(self, task_manager: TaskManager, router: ModelRouter, log_level: str = "INFO"): + self.task_manager = task_manager + self.router = router + self.log_level = log_level + self._runner: asyncio.Task | None = None + self._stopped = asyncio.Event() + self._stopped.clear() + self.logger = build_logger("gpu_worker", log_level=log_level) + + async def start(self) -> None: + if self._runner and not self._runner.done(): + return + self._runner = asyncio.create_task(self._run_loop(), name="gpu-worker-loop") + + async def stop(self) -> None: + self._stopped.set() + if self._runner: + self._runner.cancel() + try: + await self._runner + except asyncio.CancelledError: + pass + + async def _run_loop(self) -> None: + while not self._stopped.is_set(): + task_id = await self.task_manager.queue.get() + try: + await self._process(task_id) + finally: + self.task_manager.queue.task_done() + + async def _process(self, task_id: str) -> None: + task = self.task_manager.get_task_record(task_id) + req = task.request_json + backend = self.router.route(req["quality_mode"]) + + log_path = self.task_manager.build_log_path(task) + task_logger = build_logger(f"task.{task_id}", log_level=self.log_level, log_file=log_path) + + try: + self.task_manager.mark_running(task_id, backend.backend_name, backend.model_name) + task_logger.info("Task started with backend=%s model=%s", backend.backend_name, backend.model_name) + + await asyncio.to_thread(self.task_manager.mark_progress, task_id, 0.3) + result = await asyncio.to_thread(backend.generate, task_id, req, task.output_dir) + await asyncio.to_thread(self.task_manager.mark_progress, task_id, 0.8) + + metadata_path = self.task_manager.build_metadata_path(task) + current = self.task_manager.get_task_record(task_id) + finished_at = datetime.now(timezone.utc).isoformat() + metadata = { + "task_id": task.task_id, + "backend": backend.backend_name, + "model_name": backend.model_name, + "prompt": req.get("prompt"), + "negative_prompt": req.get("negative_prompt"), + "seed": req.get("seed"), + "width": req.get("width"), + "height": req.get("height"), + "fps": req.get("fps"), + "steps": req.get("steps"), + "duration_sec": req.get("duration_sec"), + "status": "SUCCEEDED", + "created_at": task.created_at, + "started_at": current.started_at, + "finished_at": finished_at, + "video_path": result["video_path"], + } + await asyncio.to_thread(write_json, metadata_path, metadata) + + self.task_manager.mark_succeeded( + task_id=task_id, + video_path=result["video_path"], + first_frame_path=result["first_frame_path"], + metadata_path=str(Path(metadata_path).resolve()), + log_path=str(Path(log_path).resolve()), + ) + task_logger.info("Task succeeded: %s", json.dumps(result, ensure_ascii=False)) + except Exception as exc: + task_logger.exception("Task failed") + self.task_manager.mark_failed(task_id, str(exc), log_path=str(Path(log_path).resolve())) diff --git a/video_worker/app/main.py b/video_worker/app/main.py new file mode 100644 index 0000000..31d2695 --- /dev/null +++ b/video_worker/app/main.py @@ -0,0 +1,53 @@ +from contextlib import asynccontextmanager + +import torch +from fastapi import FastAPI + +from app.api import router +from app.backends.hunyuan_backend import HunyuanBackend +from app.backends.ltx_backend import LTXBackend +from app.gpu_worker import GPUWorker +from app.model_router import ModelRouter +from app.settings import settings +from app.task_manager import TaskManager +from app.task_store import TaskStore +from app.utils.files import ensure_dir +from app.utils.logger import build_logger + + +def build_app() -> FastAPI: + logger = build_logger("video_worker", settings.log_level) + + ensure_dir(settings.output_dir) + ensure_dir(settings.runtime_dir) + ensure_dir(settings.runtime_dir / "logs") + + store = TaskStore(settings.sqlite_path) + store.migrate() + + ltx_backend = LTXBackend(settings.ltx_model_dir) + hunyuan_backend = HunyuanBackend(settings.hunyuan_model_dir) + model_router = ModelRouter(ltx_backend, hunyuan_backend) + task_manager = TaskManager(store=store, output_root=settings.output_dir) + gpu_worker = GPUWorker(task_manager, model_router, log_level=settings.log_level) + + @asynccontextmanager + async def lifespan(_: FastAPI): + logger.info("Starting GPU worker") + await gpu_worker.start() + yield + logger.info("Stopping GPU worker") + await gpu_worker.stop() + + app = FastAPI(title="Local Video Worker", version="0.1.0", lifespan=lifespan) + + router.task_manager = task_manager + router.ltx_backend = ltx_backend + router.hunyuan_backend = hunyuan_backend + router.torch = torch + + app.include_router(router) + return app + + +app = build_app() diff --git a/video_worker/app/model_router.py b/video_worker/app/model_router.py new file mode 100644 index 0000000..89868bf --- /dev/null +++ b/video_worker/app/model_router.py @@ -0,0 +1,15 @@ +from app.backends.hunyuan_backend import HunyuanBackend +from app.backends.ltx_backend import LTXBackend + + +class ModelRouter: + def __init__(self, ltx_backend: LTXBackend, hunyuan_backend: HunyuanBackend): + self._ltx = ltx_backend + self._hunyuan = hunyuan_backend + + def route(self, quality_mode: str): + if quality_mode == "preview": + return self._ltx + if quality_mode == "refine": + return self._hunyuan + raise ValueError(f"Unsupported quality_mode: {quality_mode}") diff --git a/video_worker/app/schemas.py b/video_worker/app/schemas.py new file mode 100644 index 0000000..2dbc867 --- /dev/null +++ b/video_worker/app/schemas.py @@ -0,0 +1,44 @@ +from datetime import datetime +from typing import Literal, Optional + +from pydantic import BaseModel, Field + + +class GenerateRequest(BaseModel): + prompt: str = Field(..., min_length=1, max_length=1000) + negative_prompt: str = Field(default="", max_length=1000) + quality_mode: Literal["preview", "refine"] + duration_sec: int = Field(default=5, ge=1, le=5) + width: int = Field(default=832, ge=64, le=832) + height: int = Field(default=480, ge=64, le=480) + fps: int = Field(default=16, ge=1, le=24) + steps: int = Field(default=8, ge=1, le=100) + seed: Optional[int] = Field(default=None, ge=0, le=2**31 - 1) + + +class TaskStatusResponse(BaseModel): + task_id: str + status: Literal["PENDING", "RUNNING", "SUCCEEDED", "FAILED"] + backend: Optional[str] = None + model_name: Optional[str] = None + progress: float = 0.0 + created_at: datetime + updated_at: datetime + + +class TaskResultResponse(BaseModel): + task_id: str + status: Literal["PENDING", "RUNNING", "SUCCEEDED", "FAILED"] + video_path: Optional[str] = None + first_frame_path: Optional[str] = None + metadata_path: Optional[str] = None + log_path: Optional[str] = None + error: Optional[str] = None + + +class HealthResponse(BaseModel): + service_status: str + cuda_available: bool + gpu_name: Optional[str] + ltx_loaded: bool + hunyuan_loaded: bool diff --git a/video_worker/app/settings.py b/video_worker/app/settings.py new file mode 100644 index 0000000..0bdd069 --- /dev/null +++ b/video_worker/app/settings.py @@ -0,0 +1,30 @@ +from pathlib import Path + +from pydantic import Field +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class Settings(BaseSettings): + app_host: str = Field(default="0.0.0.0", alias="APP_HOST") + app_port: int = Field(default=8000, alias="APP_PORT") + + output_dir: Path = Field(default=Path("./outputs"), alias="OUTPUT_DIR") + runtime_dir: Path = Field(default=Path("./runtime"), alias="RUNTIME_DIR") + sqlite_path: Path = Field(default=Path("./runtime/tasks.db"), alias="SQLITE_PATH") + + ltx_model_dir: Path = Field(default=Path("./models/ltx"), alias="LTX_MODEL_DIR") + hunyuan_model_dir: Path = Field(default=Path("./models/hunyuan"), alias="HUNYUAN_MODEL_DIR") + + default_width: int = Field(default=832, alias="DEFAULT_WIDTH") + default_height: int = Field(default=480, alias="DEFAULT_HEIGHT") + default_fps: int = Field(default=16, alias="DEFAULT_FPS") + default_duration: int = Field(default=5, alias="DEFAULT_DURATION") + default_steps_preview: int = Field(default=8, alias="DEFAULT_STEPS_PREVIEW") + default_steps_refine: int = Field(default=12, alias="DEFAULT_STEPS_REFINE") + + log_level: str = Field(default="INFO", alias="LOG_LEVEL") + + model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", extra="ignore") + + +settings = Settings() diff --git a/video_worker/app/task_manager.py b/video_worker/app/task_manager.py new file mode 100644 index 0000000..4f0c66f --- /dev/null +++ b/video_worker/app/task_manager.py @@ -0,0 +1,102 @@ +import asyncio +from datetime import datetime +from pathlib import Path +from uuid import uuid4 + +from app.schemas import GenerateRequest, TaskResultResponse, TaskStatusResponse +from app.task_store import TaskRecord, TaskStore +from app.utils.files import TASK_LOG_NAME, TASK_METADATA_NAME, ensure_dir, task_output_dir + + +class TaskManager: + def __init__(self, store: TaskStore, output_root: Path): + self.store = store + self.output_root = output_root + self.queue: asyncio.Queue[str] = asyncio.Queue() + + async def create_task(self, req: GenerateRequest) -> TaskStatusResponse: + task_id = uuid4().hex + output_dir = task_output_dir(self.output_root, task_id) + ensure_dir(output_dir) + self.store.create_task(task_id=task_id, request_json=req.model_dump(), output_dir=str(output_dir.resolve())) + await self.queue.put(task_id) + return self.get_status(task_id) + + def get_task_record(self, task_id: str) -> TaskRecord: + task = self.store.get_task(task_id) + if task is None: + raise KeyError(f"Task not found: {task_id}") + return task + + def get_status(self, task_id: str) -> TaskStatusResponse: + task = self.get_task_record(task_id) + return TaskStatusResponse( + task_id=task.task_id, + status=task.status, + backend=task.backend, + model_name=task.model_name, + progress=task.progress, + created_at=datetime.fromisoformat(task.created_at), + updated_at=datetime.fromisoformat(task.updated_at), + ) + + def get_result(self, task_id: str) -> TaskResultResponse: + task = self.get_task_record(task_id) + return TaskResultResponse( + task_id=task.task_id, + status=task.status, + video_path=task.video_path, + first_frame_path=task.first_frame_path, + metadata_path=task.metadata_path, + log_path=task.log_path, + error=task.error_message, + ) + + def mark_running(self, task_id: str, backend: str, model_name: str) -> None: + self.store.update_task( + task_id, + status="RUNNING", + backend=backend, + model_name=model_name, + progress=0.1, + started_at=datetime.utcnow().isoformat(), + ) + + def mark_progress(self, task_id: str, progress: float) -> None: + self.store.update_task(task_id, progress=max(0.0, min(1.0, progress))) + + def mark_succeeded( + self, + task_id: str, + video_path: str, + first_frame_path: str, + metadata_path: str, + log_path: str, + ) -> None: + self.store.update_task( + task_id, + status="SUCCEEDED", + progress=1.0, + video_path=video_path, + first_frame_path=first_frame_path, + metadata_path=metadata_path, + log_path=log_path, + finished_at=datetime.utcnow().isoformat(), + ) + + def mark_failed(self, task_id: str, error_message: str, log_path: str | None = None) -> None: + updates = { + "status": "FAILED", + "progress": 1.0, + "error_message": error_message, + "finished_at": datetime.utcnow().isoformat(), + } + if log_path is not None: + updates["log_path"] = log_path + self.store.update_task(task_id, **updates) + + def build_metadata_path(self, task: TaskRecord) -> Path: + return Path(task.output_dir) / TASK_METADATA_NAME + + def build_log_path(self, task: TaskRecord) -> Path: + return Path(task.output_dir) / TASK_LOG_NAME diff --git a/video_worker/app/task_store.py b/video_worker/app/task_store.py new file mode 100644 index 0000000..1272311 --- /dev/null +++ b/video_worker/app/task_store.py @@ -0,0 +1,156 @@ +import json +import sqlite3 +from contextlib import contextmanager +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, Iterator, Optional + + +STATUS_PENDING = "PENDING" +STATUS_RUNNING = "RUNNING" +STATUS_SUCCEEDED = "SUCCEEDED" +STATUS_FAILED = "FAILED" + +SCHEMA_VERSION = 2 + + +def utc_now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +@dataclass +class TaskRecord: + task_id: str + status: str + backend: Optional[str] + model_name: Optional[str] + request_json: Dict[str, Any] + output_dir: str + progress: float + error_message: Optional[str] + video_path: Optional[str] + first_frame_path: Optional[str] + metadata_path: Optional[str] + log_path: Optional[str] + created_at: str + updated_at: str + started_at: Optional[str] + finished_at: Optional[str] + + +class TaskStore: + def __init__(self, sqlite_path: Path): + self.sqlite_path = sqlite_path + self.sqlite_path.parent.mkdir(parents=True, exist_ok=True) + + @contextmanager + def conn(self) -> Iterator[sqlite3.Connection]: + connection = sqlite3.connect(self.sqlite_path, check_same_thread=False) + connection.row_factory = sqlite3.Row + try: + yield connection + connection.commit() + finally: + connection.close() + + def migrate(self) -> None: + with self.conn() as connection: + connection.execute( + """ + CREATE TABLE IF NOT EXISTS schema_migrations ( + version INTEGER PRIMARY KEY, + applied_at TEXT NOT NULL + ) + """ + ) + current = connection.execute("SELECT MAX(version) as v FROM schema_migrations").fetchone()["v"] + current_version = int(current or 0) + + if current_version < 1: + connection.execute( + """ + CREATE TABLE IF NOT EXISTS tasks ( + task_id TEXT PRIMARY KEY, + status TEXT NOT NULL, + backend TEXT, + model_name TEXT, + request_json TEXT NOT NULL, + output_dir TEXT NOT NULL, + progress REAL NOT NULL DEFAULT 0, + error_message TEXT, + video_path TEXT, + first_frame_path TEXT, + metadata_path TEXT, + log_path TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL, + started_at TEXT, + finished_at TEXT + ) + """ + ) + connection.execute( + "INSERT INTO schema_migrations(version, applied_at) VALUES (?, ?)", + (1, utc_now_iso()), + ) + + if current_version < 2: + connection.execute("CREATE INDEX IF NOT EXISTS idx_tasks_status ON tasks(status)") + connection.execute("CREATE INDEX IF NOT EXISTS idx_tasks_created_at ON tasks(created_at)") + connection.execute( + "INSERT INTO schema_migrations(version, applied_at) VALUES (?, ?)", + (2, utc_now_iso()), + ) + + def create_task(self, task_id: str, request_json: Dict[str, Any], output_dir: str) -> None: + now = utc_now_iso() + with self.conn() as connection: + connection.execute( + """ + INSERT INTO tasks ( + task_id, status, request_json, output_dir, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?) + """, + (task_id, STATUS_PENDING, json.dumps(request_json, ensure_ascii=False), output_dir, now, now), + ) + + def get_task(self, task_id: str) -> Optional[TaskRecord]: + with self.conn() as connection: + row = connection.execute("SELECT * FROM tasks WHERE task_id = ?", (task_id,)).fetchone() + if row is None: + return None + return TaskRecord( + task_id=row["task_id"], + status=row["status"], + backend=row["backend"], + model_name=row["model_name"], + request_json=json.loads(row["request_json"]), + output_dir=row["output_dir"], + progress=float(row["progress"]), + error_message=row["error_message"], + video_path=row["video_path"], + first_frame_path=row["first_frame_path"], + metadata_path=row["metadata_path"], + log_path=row["log_path"], + created_at=row["created_at"], + updated_at=row["updated_at"], + started_at=row["started_at"], + finished_at=row["finished_at"], + ) + + def update_task(self, task_id: str, **fields: Any) -> None: + if not fields: + return + fields["updated_at"] = utc_now_iso() + keys = sorted(fields.keys()) + assignments = ", ".join([f"{key} = ?" for key in keys]) + values = [fields[key] for key in keys] + values.append(task_id) + with self.conn() as connection: + connection.execute(f"UPDATE tasks SET {assignments} WHERE task_id = ?", values) + + def list_migrations(self) -> list[dict[str, Any]]: + with self.conn() as connection: + rows = connection.execute("SELECT version, applied_at FROM schema_migrations ORDER BY version ASC").fetchall() + return [{"version": row["version"], "applied_at": row["applied_at"]} for row in rows] diff --git a/video_worker/app/utils/__init__.py b/video_worker/app/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/video_worker/app/utils/ffmpeg_utils.py b/video_worker/app/utils/ffmpeg_utils.py new file mode 100644 index 0000000..6ebf2dd --- /dev/null +++ b/video_worker/app/utils/ffmpeg_utils.py @@ -0,0 +1,40 @@ +import subprocess +from pathlib import Path + + +def run_cmd(cmd: list[str]) -> None: + proc = subprocess.run(cmd, capture_output=True, text=True) + if proc.returncode != 0: + raise RuntimeError(f"Command failed: {' '.join(cmd)}\nSTDOUT: {proc.stdout}\nSTDERR: {proc.stderr}") + + +def frames_to_video(frames_pattern: str, fps: int, output_video_path: Path) -> None: + output_video_path.parent.mkdir(parents=True, exist_ok=True) + cmd = [ + "ffmpeg", + "-y", + "-framerate", + str(fps), + "-i", + frames_pattern, + "-pix_fmt", + "yuv420p", + str(output_video_path), + ] + run_cmd(cmd) + + +def extract_first_frame(video_path: Path, first_frame_path: Path) -> None: + first_frame_path.parent.mkdir(parents=True, exist_ok=True) + cmd = [ + "ffmpeg", + "-y", + "-i", + str(video_path), + "-vf", + "select=eq(n\\,0)", + "-vframes", + "1", + str(first_frame_path), + ] + run_cmd(cmd) diff --git a/video_worker/app/utils/files.py b/video_worker/app/utils/files.py new file mode 100644 index 0000000..5f12917 --- /dev/null +++ b/video_worker/app/utils/files.py @@ -0,0 +1,24 @@ +import json +from pathlib import Path +from typing import Any, Dict + + +TASK_VIDEO_NAME = "video.mp4" +TASK_FIRST_FRAME_NAME = "first_frame.jpg" +TASK_METADATA_NAME = "metadata.json" +TASK_LOG_NAME = "run.log" + + +def ensure_dir(path: Path) -> Path: + path.mkdir(parents=True, exist_ok=True) + return path + + +def task_output_dir(base_output_dir: Path, task_id: str) -> Path: + return ensure_dir(base_output_dir / task_id) + + +def write_json(path: Path, data: Dict[str, Any]) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2) diff --git a/video_worker/app/utils/image_utils.py b/video_worker/app/utils/image_utils.py new file mode 100644 index 0000000..9004896 --- /dev/null +++ b/video_worker/app/utils/image_utils.py @@ -0,0 +1,12 @@ +from pathlib import Path + +from PIL import Image, ImageDraw, ImageFont + + +def make_dummy_frame(path: Path, width: int, height: int, text: str, step: int) -> None: + image = Image.new("RGB", (width, height), color=(25 + step * 5 % 200, 40, 60)) + draw = ImageDraw.Draw(image) + font = ImageFont.load_default() + draw.text((16, 16), text, fill=(240, 240, 240), font=font) + draw.text((16, 38), f"frame={step}", fill=(220, 220, 220), font=font) + image.save(path, format="JPEG", quality=90) diff --git a/video_worker/app/utils/logger.py b/video_worker/app/utils/logger.py new file mode 100644 index 0000000..b015d52 --- /dev/null +++ b/video_worker/app/utils/logger.py @@ -0,0 +1,24 @@ +import logging +from pathlib import Path + + +def build_logger(name: str, log_level: str = "INFO", log_file: Path | None = None) -> logging.Logger: + logger = logging.getLogger(name) + logger.setLevel(getattr(logging, log_level.upper(), logging.INFO)) + + if logger.handlers: + return logger + + formatter = logging.Formatter("%(asctime)s | %(levelname)s | %(name)s | %(message)s") + + stream_handler = logging.StreamHandler() + stream_handler.setFormatter(formatter) + logger.addHandler(stream_handler) + + if log_file is not None: + log_file.parent.mkdir(parents=True, exist_ok=True) + file_handler = logging.FileHandler(log_file, encoding="utf-8") + file_handler.setFormatter(formatter) + logger.addHandler(file_handler) + + return logger diff --git a/video_worker/requirements.txt b/video_worker/requirements.txt new file mode 100644 index 0000000..cb6536b --- /dev/null +++ b/video_worker/requirements.txt @@ -0,0 +1,12 @@ +fastapi==0.116.1 +uvicorn[standard]==0.35.0 +pydantic==2.11.7 +pydantic-settings==2.10.1 +python-dotenv==1.1.1 +torch==2.8.0 +transformers==4.55.4 +diffusers==0.34.0 +accelerate==1.10.1 +safetensors==0.6.2 +Pillow==11.3.0 +requests==2.32.4 diff --git a/video_worker/scripts/install_windows_env.ps1 b/video_worker/scripts/install_windows_env.ps1 new file mode 100644 index 0000000..2463ac3 --- /dev/null +++ b/video_worker/scripts/install_windows_env.ps1 @@ -0,0 +1,31 @@ +$ErrorActionPreference = "Stop" +$Root = Split-Path -Parent (Split-Path -Parent $MyInvocation.MyCommand.Path) +Set-Location $Root + +if (!(Get-Command py -ErrorAction SilentlyContinue) -and !(Get-Command python -ErrorAction SilentlyContinue)) { + throw "Python launcher (py) or python not found" +} + +if (Test-Path .venv) { + Write-Host ".venv already exists, reusing" +} else { + if (Get-Command py -ErrorAction SilentlyContinue) { + py -3 -m venv .venv + } else { + python -m venv .venv + } +} + +.\.venv\Scripts\Activate.ps1 +python -m pip install --upgrade pip +pip install -r requirements.txt + +if (!(Get-Command ffmpeg -ErrorAction SilentlyContinue)) { + Write-Warning "ffmpeg not found in PATH. Please install ffmpeg and ensure PATH is updated." +} + +New-Item -ItemType Directory -Force outputs, runtime, runtime\logs, models\ltx, models\hunyuan | Out-Null +if (!(Test-Path .env)) { Copy-Item .env.example .env } + +Write-Host "[OK] install completed" +Write-Host "next: .\\scripts\\run_server.ps1" diff --git a/video_worker/scripts/install_wsl_env.sh b/video_worker/scripts/install_wsl_env.sh new file mode 100755 index 0000000..a7a6e17 --- /dev/null +++ b/video_worker/scripts/install_wsl_env.sh @@ -0,0 +1,31 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +cd "$ROOT_DIR" + +if ! command -v python3 >/dev/null 2>&1; then + echo "[ERROR] python3 not found" + exit 1 +fi + +python3 -m venv .venv +source .venv/bin/activate +python -m pip install --upgrade pip +pip install -r requirements.txt + +if command -v apt-get >/dev/null 2>&1; then + sudo apt-get update + sudo apt-get install -y ffmpeg +else + echo "[WARN] apt-get unavailable, please install ffmpeg manually" +fi + +mkdir -p outputs runtime runtime/logs models/ltx models/hunyuan + +if [ ! -f .env ]; then + cp .env.example .env +fi + +echo "[OK] install completed" +echo "next: source .venv/bin/activate && bash scripts/run_server.sh" diff --git a/video_worker/scripts/migrate_db.py b/video_worker/scripts/migrate_db.py new file mode 100644 index 0000000..7afed39 --- /dev/null +++ b/video_worker/scripts/migrate_db.py @@ -0,0 +1,10 @@ +from app.settings import settings +from app.task_store import TaskStore + + +if __name__ == "__main__": + store = TaskStore(settings.sqlite_path) + store.migrate() + print("DB migrated:", settings.sqlite_path) + for row in store.list_migrations(): + print(row) diff --git a/video_worker/scripts/run_server.bat b/video_worker/scripts/run_server.bat new file mode 100644 index 0000000..2af30ea --- /dev/null +++ b/video_worker/scripts/run_server.bat @@ -0,0 +1,18 @@ +@echo off +setlocal +cd /d %~dp0\.. +if not exist .venv ( + echo [ERROR] .venv not found, run scripts\install_windows_env.ps1 first + exit /b 1 +) +if not exist .env ( + echo [ERROR] .env not found, copy from .env.example + exit /b 1 +) +call .venv\Scripts\activate.bat +for /f "usebackq tokens=1,* delims==" %%A in (".env") do ( + if not "%%A"=="" set "%%A=%%B" +) +if "%APP_HOST%"=="" set APP_HOST=0.0.0.0 +if "%APP_PORT%"=="" set APP_PORT=8000 +python -m uvicorn app.main:app --host %APP_HOST% --port %APP_PORT% diff --git a/video_worker/scripts/run_server.ps1 b/video_worker/scripts/run_server.ps1 new file mode 100644 index 0000000..836cd3a --- /dev/null +++ b/video_worker/scripts/run_server.ps1 @@ -0,0 +1,20 @@ +$ErrorActionPreference = "Stop" +$Root = Split-Path -Parent (Split-Path -Parent $MyInvocation.MyCommand.Path) +Set-Location $Root + +if (!(Test-Path .venv)) { throw ".venv not found, run scripts/install_windows_env.ps1 first" } +if (!(Test-Path .env)) { throw ".env not found, copy from .env.example" } + +.\.venv\Scripts\Activate.ps1 +Get-Content .env | ForEach-Object { + if ($_ -match "^\s*#") { return } + if ($_ -match "^\s*$") { return } + $parts = $_ -split "=", 2 + if ($parts.Length -eq 2) { + [System.Environment]::SetEnvironmentVariable($parts[0], $parts[1], "Process") + } +} + +$hostValue = if ($env:APP_HOST) { $env:APP_HOST } else { "0.0.0.0" } +$portValue = if ($env:APP_PORT) { $env:APP_PORT } else { "8000" } +python -m uvicorn app.main:app --host $hostValue --port $portValue diff --git a/video_worker/scripts/run_server.sh b/video_worker/scripts/run_server.sh new file mode 100755 index 0000000..be95121 --- /dev/null +++ b/video_worker/scripts/run_server.sh @@ -0,0 +1,22 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +cd "$ROOT_DIR" + +if [ ! -d .venv ]; then + echo "[ERROR] .venv not found, run scripts/install_wsl_env.sh first" + exit 1 +fi + +if [ ! -f .env ]; then + echo "[ERROR] .env not found, copy from .env.example" + exit 1 +fi + +source .venv/bin/activate +set -a +source .env +set +a + +python -m uvicorn app.main:app --host "${APP_HOST:-0.0.0.0}" --port "${APP_PORT:-8000}" diff --git a/video_worker/scripts/smoke_test.py b/video_worker/scripts/smoke_test.py new file mode 100644 index 0000000..4f8e779 --- /dev/null +++ b/video_worker/scripts/smoke_test.py @@ -0,0 +1,48 @@ +import json +import sys +import time + +import requests + +BASE_URL = sys.argv[1] if len(sys.argv) > 1 else "http://127.0.0.1:8000" + + +def main() -> None: + health = requests.get(f"{BASE_URL}/health", timeout=15) + health.raise_for_status() + print("[health]", json.dumps(health.json(), ensure_ascii=False)) + + payload = { + "prompt": "a lonely man walking in a rainy neon street, cinematic, handheld camera", + "negative_prompt": "blurry, deformed face, extra limbs, flicker", + "quality_mode": "preview", + "duration_sec": 1, + "width": 320, + "height": 240, + "fps": 8, + "steps": 8, + "seed": 123456, + } + + created = requests.post(f"{BASE_URL}/generate", json=payload, timeout=30) + created.raise_for_status() + created_data = created.json() + task_id = created_data["task_id"] + print("[create]", json.dumps(created_data, ensure_ascii=False)) + + while True: + status = requests.get(f"{BASE_URL}/tasks/{task_id}", timeout=15) + status.raise_for_status() + status_data = status.json() + print("[status]", json.dumps(status_data, ensure_ascii=False)) + if status_data["status"] in {"SUCCEEDED", "FAILED"}: + break + time.sleep(2) + + result = requests.get(f"{BASE_URL}/tasks/{task_id}/result", timeout=15) + result.raise_for_status() + print("[result]", json.dumps(result.json(), ensure_ascii=False, indent=2)) + + +if __name__ == "__main__": + main()