Files
AI_A4000/video_worker

Local Video Worker

一个本地单机视频生成 Worker提供最小化 HTTP API接收任务、按模式路由模型、单任务串行执行、输出统一结果目录。

项目拆分(推荐部署方式)

已拆分为两个可独立部署的目录:

  • center_dispatch/中央调度项目HTTP 管理 + WS 下发)
  • edge_node/:边缘执行项目(本地推理 + 主动连中心)

1. 项目说明

  • 目标:边缘执行节点,不是完整平台。
  • 路由规则:
    • preview -> LTX-Video
    • refine -> HunyuanVideo-1.5
  • 状态机:PENDING / RUNNING / SUCCEEDED / FAILED
  • 当前后端是可执行骨架:
    • 已实现懒加载、参数透传、输出规范、日志与错误处理
    • 真实模型推理请替换 app/backends/ltx_backend.pyapp/backends/hunyuan_backend.pyTODO 位置

2. 环境准备

  • Python 3.10+
  • ffmpeg
  • NVIDIA GPU + CUDA可选健康检查会显示可用性

3. WSL + CUDA 检查方法

在 WSL Ubuntu 内执行:

nvidia-smi
python -c "import torch; print(torch.cuda.is_available()); print(torch.cuda.get_device_name(0) if torch.cuda.is_available() else 'no gpu')"

4. 安装命令

WSL / Linux

cd video_worker
bash scripts/install_wsl_env.sh
cp .env.example .env  # 若脚本未自动生成

Windows PowerShell

cd video_worker
.\scripts\install_windows_env.ps1

Windows 终端PowerShell里调用 WSL 安装并启动

在仓库里进入 video_worker 后执行(无图形界面、仅 CLI

cd video_worker
.\scripts\run_wsl_one_click.ps1

多个 WSL 发行版时指定名称(与 wsl -l -v 中一致):

.\scripts\run_wsl_one_click.ps1 -Distro Ubuntu-22.04

5. 启动命令

WSL / Linux

cd video_worker
bash scripts/run_server.sh

边缘设备一键接入(仅上游下发,边缘主动连中心):

cd video_worker
bash scripts/start_edge_device_local.sh

Windows + WSL 一键边缘设备启动(推荐):

cd video_worker
.\scripts\edge_device_wsl.ps1 -Action start

常用操作:

.\scripts\edge_device_wsl.ps1 -Action status
.\scripts\edge_device_wsl.ps1 -Action restart
.\scripts\edge_device_wsl.ps1 -Action stop

多发行版时可指定:

.\scripts\edge_device_wsl.ps1 -Action start -Distro Ubuntu-22.04

边缘设备停止 / 重启:

bash scripts/stop_edge_device_local.sh
bash scripts/restart_edge_device_local.sh

独立 WS 网关服务(远程推荐):

cd video_worker
bash scripts/run_ws_service.sh

Docker 部署HTTP -> WS 边缘下发):

cd video_worker
bash scripts/run_edge_dispatch_docker.sh

Windows

cd video_worker
.\scripts\run_server.ps1

独立 WS 网关服务:

cd video_worker
.\scripts\run_ws_service.ps1

或:

scripts\run_server.bat

6. 调用示例

创建任务:

curl -X POST http://127.0.0.1:8000/generate \
  -H "Content-Type: application/json" \
  -d '{
    "prompt": "a lonely man walking in a rainy neon street, cinematic, handheld camera",
    "negative_prompt": "blurry, deformed face, extra limbs, flicker",
    "quality_mode": "preview",
    "duration_sec": 5,
    "width": 832,
    "height": 480,
    "fps": 16,
    "steps": 8,
    "seed": 123456
  }'

轮询状态:

curl http://127.0.0.1:8000/tasks/<task_id>
curl http://127.0.0.1:8000/tasks/<task_id>/result

烟雾测试:

cd video_worker
. .venv/bin/activate  # Windows: .\.venv\Scripts\Activate.ps1
python scripts/smoke_test.py

7. 目录说明

video_worker/
├─ app/
│  ├─ main.py
│  ├─ api.py
│  ├─ schemas.py
│  ├─ settings.py
│  ├─ task_manager.py
│  ├─ model_router.py
│  ├─ gpu_worker.py
│  ├─ task_store.py
│  ├─ backends/
│  │  ├─ base.py
│  │  ├─ ltx_backend.py
│  │  └─ hunyuan_backend.py
│  ├─ utils/
│  │  ├─ files.py
│  │  ├─ ffmpeg_utils.py
│  │  ├─ image_utils.py
│  │  └─ logger.py
│  ├─ ws_service.py
│  └─ edge_dispatch_service.py
├─ models/
│  ├─ ltx/
│  └─ hunyuan/
├─ outputs/
├─ runtime/
│  ├─ tasks.db
│  └─ logs/
├─ scripts/
│  ├─ install_wsl_env.sh
│  ├─ install_windows_env.ps1
│  ├─ run_server.sh
│  ├─ run_ws_service.sh
│  ├─ run_server.ps1
│  ├─ run_ws_service.ps1
│  ├─ run_server.bat
│  ├─ run_ws_service.bat
│  ├─ run_edge_dispatch_docker.sh
│  ├─ stop_edge_dispatch_docker.sh
│  ├─ migrate_db.py
│  ├─ smoke_test.py
│  ├─ ws_smoke_test.py
│  └─ edge_device_client.py
├─ docker/
│  └─ edge-dispatch/
│     └─ Dockerfile
├─ docker-compose.edge-dispatch.yml
├─ .dockerignore
├─ requirements.txt
├─ .env.example
└─ README.md

8. API 说明

  • POST /generate
    • 创建任务并入队
  • GET /tasks/{task_id}
    • 查询任务状态
  • GET /tasks/{task_id}/result
    • 查询结果路径或错误
  • GET /health
    • 服务状态、CUDA、GPU 名称、模型加载状态
  • WS /ws/generate
    • 远程服务通过 WebSocket 触发任务并接收状态推送
  • POST /dispatch/generateedge_dispatch_service
    • 对外 HTTP 入口,触发 WS 下发给边缘设备
  • GET /dispatch/{dispatch_id}edge_dispatch_service
    • 查询调度任务状态和结果
  • POST /dispatch/{dispatch_id}/artifactsedge_dispatch_service
    • 边缘上传产物到中心,由中心服务直传 OSS返回 OSS URL
  • GET /devicesedge_dispatch_service
    • 查看在线边缘设备
  • WS /ws/edge/{device_id}edge_dispatch_service
    • 边缘设备接入通道
  • POST /devices/{device_id}/commandedge_dispatch_service
    • 通过 HTTP 下发设备运维指令(中心自动转 WS
  • GET /commands/{dispatch_id}edge_dispatch_service
    • 查询设备指令执行状态和结果

边缘设备 WS 控制指令(由上游下发到 edge_device_client.py

  • generate: 下发生成任务
  • update_code: 拉取最新代码(默认执行 git fetch --all && git checkout <branch> && git pull --ff-only origin <branch>,可通过 command 自定义)
  • restart_service: 执行边缘本地重启脚本
  • ping: 心跳探活,设备回 pong

HTTP 下发设备指令示例(推荐上游系统使用):

# 1) 更新边缘设备代码
curl -X POST http://<dispatch-host>:8020/devices/edge-a4000-01/command \
  -H "Content-Type: application/json" \
  -d '{
    "command": "update_code",
    "branch": "master"
  }'

# 2) 重启边缘设备服务
curl -X POST http://<dispatch-host>:8020/devices/edge-a4000-01/command \
  -H "Content-Type: application/json" \
  -d '{
    "command": "restart_service"
  }'

# 3) 设备心跳检查
curl -X POST http://<dispatch-host>:8020/devices/edge-a4000-01/command \
  -H "Content-Type: application/json" \
  -d '{
    "command": "ping"
  }'

# 4) 查询指令执行状态(使用上一步返回的 dispatch_id
curl http://<dispatch-host>:8020/commands/<dispatch_id>

参数限制:

  • duration_sec: 1~5
  • width: <= 832
  • height: <= 480
  • fps: <= 24
  • quality_mode: previewrefine

WebSocket 协议

连接地址:

ws://<host>:<port>/ws/generate

如果使用独立网关,默认是:

ws://<host>:8010/ws/generate

客户端发送(触发任务):

{
  "action": "generate",
  "payload": {
    "prompt": "a lonely man walking in a rainy neon street, cinematic, handheld camera",
    "negative_prompt": "blurry, deformed face, extra limbs, flicker",
    "quality_mode": "preview",
    "duration_sec": 1,
    "width": 320,
    "height": 240,
    "fps": 8,
    "steps": 8,
    "seed": 123456
  }
}

也支持只订阅已存在任务:

{
  "action": "watch",
  "task_id": "your_task_id"
}

服务端事件:

  • accepted: 已创建任务并入队
  • status: 状态变化推送(PENDING/RUNNING/SUCCEEDED/FAILED
  • result: 最终结果(成功路径或失败错误)
  • error: 请求错误或内部错误

WS 烟雾测试:

cd video_worker
. .venv/bin/activate  # Windows: .\.venv\Scripts\Activate.ps1
python scripts/ws_smoke_test.py

远程部署建议:

  1. Worker 服务运行在 8000 端口(scripts/run_server.sh)。
  2. WS 网关运行在 8010 端口(scripts/run_ws_service.sh)。
  3. .env 中设置 WORKER_BASE_URL 指向 Worker 地址(例如 http://127.0.0.1:8000 或内网地址)。

Docker 快速部署(推荐)

  1. 启动调度服务容器:
cd video_worker
cp .env.example .env
bash scripts/run_edge_dispatch_docker.sh
  1. 检查健康状态:
curl http://127.0.0.1:8020/health
  1. 在边缘设备上运行客户端(连接调度服务并执行本地 Worker
cd video_worker
. .venv/bin/activate
export DISPATCH_WS_URL=ws://<dispatch-host>:8020/ws/edge/edge-a4000-01
export WORKER_BASE_URL=http://127.0.0.1:8000
python scripts/edge_device_client.py
  1. 外部系统触发生成HTTP
curl -X POST http://<dispatch-host>:8020/dispatch/generate \
  -H "Content-Type: application/json" \
  -d '{
    "device_id": "edge-a4000-01",
    "request": {
      "prompt": "a lonely man walking in a rainy neon street, cinematic, handheld camera",
      "negative_prompt": "blurry, deformed face, extra limbs, flicker",
      "quality_mode": "preview",
      "duration_sec": 1,
      "width": 320,
      "height": 240,
      "fps": 8,
      "steps": 8,
      "seed": 123456
    }
  }'
  1. 查询调度状态:
curl http://<dispatch-host>:8020/dispatch/<dispatch_id>

OSS 直传链路(防止中心堆积)

  1. 边缘执行完成后将 video.mp4/first_frame.jpg/metadata.json/run.log 提交到:
    • POST /dispatch/{dispatch_id}/artifacts
  2. 中心服务不落地文件,直接流式上传到 OSS。
  3. 中心仅保存 artifact_urlsOSS URL外部系统通过 GET /dispatch/{dispatch_id} 获取结果。

需要在 .env 配置 OSS

OSS_ENABLED=true
OSS_ENDPOINT=https://oss-cn-hangzhou.aliyuncs.com
OSS_BUCKET=your-bucket
OSS_ACCESS_KEY_ID=your-ak
OSS_ACCESS_KEY_SECRET=your-sk
OSS_PUBLIC_BASE_URL=https://your-bucket.oss-cn-hangzhou.aliyuncs.com
OSS_PREFIX=video-worker

9. 常见问题

  • ffmpeg not found
    • WSL: sudo apt-get install -y ffmpeg
    • Windows: 安装 ffmpeg 并加入 PATH
  • torch.cuda.is_available() == False
    • 检查驱动、CUDA、WSL GPU 直通是否正常
  • 任务失败
    • 查看 outputs/{task_id}/run.log
    • 查看 /tasks/{task_id}/result 返回的 error

10. 已知限制

  • 当前后端默认输出演示视频(可执行骨架),未内置完整真实模型权重加载
  • 单进程单 worker 串行执行,不支持多卡并行
  • SQLite 用于单机场景

迁移支持(数据库)

项目内置 schema version 迁移:

  • 启动服务时自动执行迁移
  • 也可手动执行:
python scripts/migrate_db.py

迁移记录存储在 schema_migrations 表,便于后续版本升级与跨环境迁移。