From e606b3dcd65815c0145281d33760df8284f30e59 Mon Sep 17 00:00:00 2001 From: Daniel Date: Tue, 7 Apr 2026 01:00:56 +0800 Subject: [PATCH] =?UTF-8?q?fix:=E4=BC=98=E5=8C=96=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- video_worker/.dockerignore | 7 + video_worker/.env.example | 6 + video_worker/README.md | 188 +++++++++++++- video_worker/app/api.py | 90 ++++++- video_worker/app/edge_dispatch_service.py | 232 ++++++++++++++++++ video_worker/app/settings.py | 6 + video_worker/app/ws_service.py | 87 +++++++ video_worker/docker-compose.edge-dispatch.yml | 15 ++ video_worker/docker/edge-dispatch/Dockerfile | 15 ++ video_worker/requirements.txt | 1 + video_worker/scripts/edge_device_client.py | 88 +++++++ video_worker/scripts/one_click_wsl.sh | 23 ++ .../scripts/run_edge_dispatch_docker.sh | 14 ++ video_worker/scripts/run_ws_service.bat | 18 ++ video_worker/scripts/run_ws_service.ps1 | 20 ++ video_worker/scripts/run_ws_service.sh | 21 ++ video_worker/scripts/run_wsl_one_click.ps1 | 31 +++ .../scripts/stop_edge_dispatch_docker.sh | 7 + video_worker/scripts/ws_smoke_test.py | 37 +++ 19 files changed, 899 insertions(+), 7 deletions(-) create mode 100644 video_worker/.dockerignore create mode 100644 video_worker/app/edge_dispatch_service.py create mode 100644 video_worker/app/ws_service.py create mode 100644 video_worker/docker-compose.edge-dispatch.yml create mode 100644 video_worker/docker/edge-dispatch/Dockerfile create mode 100644 video_worker/scripts/edge_device_client.py create mode 100644 video_worker/scripts/one_click_wsl.sh create mode 100755 video_worker/scripts/run_edge_dispatch_docker.sh create mode 100644 video_worker/scripts/run_ws_service.bat create mode 100644 video_worker/scripts/run_ws_service.ps1 create mode 100755 video_worker/scripts/run_ws_service.sh create mode 100644 video_worker/scripts/run_wsl_one_click.ps1 create mode 100755 video_worker/scripts/stop_edge_dispatch_docker.sh create mode 100644 video_worker/scripts/ws_smoke_test.py diff --git a/video_worker/.dockerignore b/video_worker/.dockerignore new file mode 100644 index 0000000..58baa9f --- /dev/null +++ b/video_worker/.dockerignore @@ -0,0 +1,7 @@ +.venv +__pycache__ +*.pyc +.git +runtime/ +outputs/ +models/ diff --git a/video_worker/.env.example b/video_worker/.env.example index a015d42..e447043 100644 --- a/video_worker/.env.example +++ b/video_worker/.env.example @@ -1,5 +1,11 @@ APP_HOST=0.0.0.0 APP_PORT=8000 +WS_GATEWAY_HOST=0.0.0.0 +WS_GATEWAY_PORT=8010 +WORKER_BASE_URL=http://127.0.0.1:8000 +WS_POLL_INTERVAL_SEC=1.0 +EDGE_DISPATCH_HOST=0.0.0.0 +EDGE_DISPATCH_PORT=8020 OUTPUT_DIR=./outputs RUNTIME_DIR=./runtime SQLITE_PATH=./runtime/tasks.db diff --git a/video_worker/README.md b/video_worker/README.md index ba1ab26..4a056de 100644 --- a/video_worker/README.md +++ b/video_worker/README.md @@ -45,6 +45,21 @@ cd video_worker .\scripts\install_windows_env.ps1 ``` +### Windows 终端(PowerShell)里调用 WSL 安装并启动 + +在仓库里进入 `video_worker` 后执行(无图形界面、仅 CLI): + +```powershell +cd video_worker +.\scripts\run_wsl_one_click.ps1 +``` + +多个 WSL 发行版时指定名称(与 `wsl -l -v` 中一致): + +```powershell +.\scripts\run_wsl_one_click.ps1 -Distro Ubuntu-22.04 +``` + ## 5. 启动命令 ### WSL / Linux @@ -54,6 +69,20 @@ cd video_worker bash scripts/run_server.sh ``` +独立 WS 网关服务(远程推荐): + +```bash +cd video_worker +bash scripts/run_ws_service.sh +``` + +Docker 部署(HTTP -> WS 边缘下发): + +```bash +cd video_worker +bash scripts/run_edge_dispatch_docker.sh +``` + ### Windows ```powershell @@ -61,6 +90,13 @@ cd video_worker .\scripts\run_server.ps1 ``` +独立 WS 网关服务: + +```powershell +cd video_worker +.\scripts\run_ws_service.ps1 +``` + 或: ```bat @@ -119,11 +155,13 @@ video_worker/ │ │ ├─ base.py │ │ ├─ ltx_backend.py │ │ └─ hunyuan_backend.py -│ └─ utils/ -│ ├─ files.py -│ ├─ ffmpeg_utils.py -│ ├─ image_utils.py -│ └─ logger.py +│ ├─ utils/ +│ │ ├─ files.py +│ │ ├─ ffmpeg_utils.py +│ │ ├─ image_utils.py +│ │ └─ logger.py +│ ├─ ws_service.py +│ └─ edge_dispatch_service.py ├─ models/ │ ├─ ltx/ │ └─ hunyuan/ @@ -135,10 +173,22 @@ video_worker/ │ ├─ install_wsl_env.sh │ ├─ install_windows_env.ps1 │ ├─ run_server.sh +│ ├─ run_ws_service.sh │ ├─ run_server.ps1 +│ ├─ run_ws_service.ps1 │ ├─ run_server.bat +│ ├─ run_ws_service.bat +│ ├─ run_edge_dispatch_docker.sh +│ ├─ stop_edge_dispatch_docker.sh │ ├─ migrate_db.py -│ └─ smoke_test.py +│ ├─ smoke_test.py +│ ├─ ws_smoke_test.py +│ └─ edge_device_client.py +├─ docker/ +│ └─ edge-dispatch/ +│ └─ Dockerfile +├─ docker-compose.edge-dispatch.yml +├─ .dockerignore ├─ requirements.txt ├─ .env.example └─ README.md @@ -154,6 +204,16 @@ video_worker/ - 查询结果路径或错误 - `GET /health` - 服务状态、CUDA、GPU 名称、模型加载状态 +- `WS /ws/generate` + - 远程服务通过 WebSocket 触发任务并接收状态推送 +- `POST /dispatch/generate`(edge_dispatch_service) + - 对外 HTTP 入口,触发 WS 下发给边缘设备 +- `GET /dispatch/{dispatch_id}`(edge_dispatch_service) + - 查询调度任务状态和结果 +- `GET /devices`(edge_dispatch_service) + - 查看在线边缘设备 +- `WS /ws/edge/{device_id}`(edge_dispatch_service) + - 边缘设备接入通道 参数限制: @@ -163,6 +223,122 @@ video_worker/ - `fps`: <= 24 - `quality_mode`: `preview` 或 `refine` +### WebSocket 协议 + +连接地址: + +```text +ws://:/ws/generate +``` + +如果使用独立网关,默认是: + +```text +ws://:8010/ws/generate +``` + +客户端发送(触发任务): + +```json +{ + "action": "generate", + "payload": { + "prompt": "a lonely man walking in a rainy neon street, cinematic, handheld camera", + "negative_prompt": "blurry, deformed face, extra limbs, flicker", + "quality_mode": "preview", + "duration_sec": 1, + "width": 320, + "height": 240, + "fps": 8, + "steps": 8, + "seed": 123456 + } +} +``` + +也支持只订阅已存在任务: + +```json +{ + "action": "watch", + "task_id": "your_task_id" +} +``` + +服务端事件: + +- `accepted`: 已创建任务并入队 +- `status`: 状态变化推送(`PENDING/RUNNING/SUCCEEDED/FAILED`) +- `result`: 最终结果(成功路径或失败错误) +- `error`: 请求错误或内部错误 + +WS 烟雾测试: + +```bash +cd video_worker +. .venv/bin/activate # Windows: .\.venv\Scripts\Activate.ps1 +python scripts/ws_smoke_test.py +``` + +远程部署建议: + +1. Worker 服务运行在 `8000` 端口(`scripts/run_server.sh`)。 +2. WS 网关运行在 `8010` 端口(`scripts/run_ws_service.sh`)。 +3. 在 `.env` 中设置 `WORKER_BASE_URL` 指向 Worker 地址(例如 `http://127.0.0.1:8000` 或内网地址)。 + +### Docker 快速部署(推荐) + +1. 启动调度服务容器: + +```bash +cd video_worker +cp .env.example .env +bash scripts/run_edge_dispatch_docker.sh +``` + +2. 检查健康状态: + +```bash +curl http://127.0.0.1:8020/health +``` + +3. 在边缘设备上运行客户端(连接调度服务并执行本地 Worker): + +```bash +cd video_worker +. .venv/bin/activate +export DISPATCH_WS_URL=ws://:8020/ws/edge/edge-a4000-01 +export WORKER_BASE_URL=http://127.0.0.1:8000 +python scripts/edge_device_client.py +``` + +4. 外部系统触发生成(HTTP): + +```bash +curl -X POST http://:8020/dispatch/generate \ + -H "Content-Type: application/json" \ + -d '{ + "device_id": "edge-a4000-01", + "request": { + "prompt": "a lonely man walking in a rainy neon street, cinematic, handheld camera", + "negative_prompt": "blurry, deformed face, extra limbs, flicker", + "quality_mode": "preview", + "duration_sec": 1, + "width": 320, + "height": 240, + "fps": 8, + "steps": 8, + "seed": 123456 + } + }' +``` + +5. 查询调度状态: + +```bash +curl http://:8020/dispatch/ +``` + ## 9. 常见问题 - `ffmpeg not found` diff --git a/video_worker/app/api.py b/video_worker/app/api.py index b6fa713..9a1be70 100644 --- a/video_worker/app/api.py +++ b/video_worker/app/api.py @@ -1,4 +1,7 @@ -from fastapi import APIRouter, HTTPException +import asyncio +from typing import Any + +from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect from app.schemas import GenerateRequest, HealthResponse, TaskResultResponse, TaskStatusResponse @@ -46,3 +49,88 @@ def health_check(): ltx_loaded=ltx_backend.is_loaded(), hunyuan_loaded=hunyuan_backend.is_loaded(), ) + + +@router.websocket("/ws/generate") +async def ws_generate(websocket: WebSocket): + await websocket.accept() + task_manager = router.task_manager + + try: + message: dict[str, Any] = await websocket.receive_json() + action = message.get("action") + + if action == "watch": + task_id = message.get("task_id") + if not task_id: + await websocket.send_json({"event": "error", "error": "task_id is required for watch"}) + await websocket.close(code=1008) + return + else: + payload = message.get("payload", message) + try: + req = GenerateRequest.model_validate(payload) + except Exception as exc: + await websocket.send_json({"event": "error", "error": f"invalid request: {exc}"}) + await websocket.close(code=1008) + return + + created = await task_manager.create_task(req) + task_id = created.task_id + await websocket.send_json( + { + "event": "accepted", + "task_id": task_id, + "status": created.status, + "backend": created.backend, + "model_name": created.model_name, + "progress": created.progress, + "created_at": created.created_at.isoformat(), + "updated_at": created.updated_at.isoformat(), + } + ) + + last_status = None + while True: + status = task_manager.get_status(task_id) + if status.status != last_status: + await websocket.send_json( + { + "event": "status", + "task_id": status.task_id, + "status": status.status, + "backend": status.backend, + "model_name": status.model_name, + "progress": status.progress, + "created_at": status.created_at.isoformat(), + "updated_at": status.updated_at.isoformat(), + } + ) + last_status = status.status + + if status.status in {"SUCCEEDED", "FAILED"}: + result = task_manager.get_result(task_id) + await websocket.send_json( + { + "event": "result", + "task_id": result.task_id, + "status": result.status, + "video_path": result.video_path, + "first_frame_path": result.first_frame_path, + "metadata_path": result.metadata_path, + "log_path": result.log_path, + "error": result.error, + } + ) + await websocket.close(code=1000) + break + + await asyncio.sleep(1) + except WebSocketDisconnect: + return + except KeyError as exc: + await websocket.send_json({"event": "error", "error": str(exc)}) + await websocket.close(code=1008) + except Exception as exc: + await websocket.send_json({"event": "error", "error": f"unexpected error: {exc}"}) + await websocket.close(code=1011) diff --git a/video_worker/app/edge_dispatch_service.py b/video_worker/app/edge_dispatch_service.py new file mode 100644 index 0000000..9a8ac94 --- /dev/null +++ b/video_worker/app/edge_dispatch_service.py @@ -0,0 +1,232 @@ +import asyncio +from dataclasses import dataclass +from datetime import datetime, timezone +from typing import Any, Optional +from uuid import uuid4 + +from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect +from pydantic import BaseModel + +from app.schemas import GenerateRequest + + +def utc_now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +class DispatchGenerateRequest(BaseModel): + device_id: Optional[str] = None + request: GenerateRequest + + +class DispatchResponse(BaseModel): + dispatch_id: str + device_id: str + status: str + created_at: str + + +@dataclass +class EdgeConnection: + device_id: str + websocket: WebSocket + connected_at: str + last_seen: str + busy: bool = False + + +class EdgeDispatchManager: + def __init__(self) -> None: + self.connections: dict[str, EdgeConnection] = {} + self.dispatches: dict[str, dict[str, Any]] = {} + self.lock = asyncio.Lock() + + async def register(self, device_id: str, websocket: WebSocket) -> EdgeConnection: + async with self.lock: + conn = EdgeConnection( + device_id=device_id, + websocket=websocket, + connected_at=utc_now_iso(), + last_seen=utc_now_iso(), + busy=False, + ) + self.connections[device_id] = conn + return conn + + async def unregister(self, device_id: str) -> None: + async with self.lock: + self.connections.pop(device_id, None) + + async def list_devices(self) -> list[dict[str, Any]]: + async with self.lock: + return [ + { + "device_id": conn.device_id, + "connected_at": conn.connected_at, + "last_seen": conn.last_seen, + "busy": conn.busy, + } + for conn in self.connections.values() + ] + + async def select_device(self, preferred: Optional[str]) -> EdgeConnection: + async with self.lock: + if preferred: + conn = self.connections.get(preferred) + if conn is None: + raise HTTPException(status_code=404, detail=f"device not found: {preferred}") + if conn.busy: + raise HTTPException(status_code=409, detail=f"device is busy: {preferred}") + return conn + + for conn in self.connections.values(): + if not conn.busy: + return conn + + raise HTTPException(status_code=409, detail="no idle edge device available") + + async def create_dispatch(self, conn: EdgeConnection, req: GenerateRequest) -> dict[str, Any]: + dispatch_id = uuid4().hex + now = utc_now_iso() + record = { + "dispatch_id": dispatch_id, + "device_id": conn.device_id, + "status": "DISPATCHED", + "request": req.model_dump(), + "created_at": now, + "updated_at": now, + "result": None, + "error": None, + } + + async with self.lock: + self.dispatches[dispatch_id] = record + target = self.connections.get(conn.device_id) + if target is None: + raise HTTPException(status_code=404, detail=f"device disconnected: {conn.device_id}") + target.busy = True + target.last_seen = now + + payload = { + "event": "generate", + "dispatch_id": dispatch_id, + "request": req.model_dump(), + } + try: + await conn.websocket.send_json(payload) + except Exception as exc: + async with self.lock: + record["status"] = "FAILED" + record["error"] = f"dispatch send failed: {exc}" + record["updated_at"] = utc_now_iso() + target = self.connections.get(conn.device_id) + if target: + target.busy = False + raise + return record + + async def mark_event(self, device_id: str, event: dict[str, Any]) -> None: + now = utc_now_iso() + async with self.lock: + conn = self.connections.get(device_id) + if conn: + conn.last_seen = now + + dispatch_id = event.get("dispatch_id") + if not dispatch_id: + return + + record = self.dispatches.get(dispatch_id) + if record is None: + return + + evt = event.get("event") + if evt == "accepted": + record["status"] = "RUNNING" + elif evt == "status": + status_val = event.get("status") + if status_val: + record["status"] = status_val + elif evt == "result": + status_val = event.get("status") or "SUCCEEDED" + record["status"] = status_val + record["result"] = event + if conn: + conn.busy = False + elif evt == "error": + record["status"] = "FAILED" + record["error"] = event.get("error") + if conn: + conn.busy = False + + record["updated_at"] = now + + async def mark_disconnect_failed(self, device_id: str) -> None: + now = utc_now_iso() + async with self.lock: + for record in self.dispatches.values(): + if record["device_id"] == device_id and record["status"] in {"DISPATCHED", "RUNNING", "PENDING"}: + record["status"] = "FAILED" + record["error"] = f"device disconnected: {device_id}" + record["updated_at"] = now + + +manager = EdgeDispatchManager() +app = FastAPI(title="Edge Dispatch Service", version="0.1.0") + + +@app.get("/health") +async def health() -> dict[str, Any]: + devices = await manager.list_devices() + return { + "service": "edge_dispatch", + "status": "ok", + "connected_devices": len(devices), + } + + +@app.get("/devices") +async def list_devices() -> dict[str, Any]: + return {"devices": await manager.list_devices()} + + +@app.post("/dispatch/generate", response_model=DispatchResponse) +async def dispatch_generate(body: DispatchGenerateRequest) -> DispatchResponse: + conn = await manager.select_device(body.device_id) + try: + record = await manager.create_dispatch(conn, body.request) + except WebSocketDisconnect as exc: + await manager.unregister(conn.device_id) + raise HTTPException(status_code=503, detail=f"device disconnected during dispatch: {conn.device_id}") from exc + except RuntimeError as exc: + raise HTTPException(status_code=503, detail=str(exc)) from exc + + return DispatchResponse( + dispatch_id=record["dispatch_id"], + device_id=record["device_id"], + status=record["status"], + created_at=record["created_at"], + ) + + +@app.get("/dispatch/{dispatch_id}") +async def get_dispatch(dispatch_id: str) -> dict[str, Any]: + record = manager.dispatches.get(dispatch_id) + if record is None: + raise HTTPException(status_code=404, detail=f"dispatch not found: {dispatch_id}") + return record + + +@app.websocket("/ws/edge/{device_id}") +async def edge_socket(websocket: WebSocket, device_id: str) -> None: + await websocket.accept() + await manager.register(device_id, websocket) + + try: + await websocket.send_json({"event": "registered", "device_id": device_id, "ts": utc_now_iso()}) + while True: + msg = await websocket.receive_json() + await manager.mark_event(device_id, msg) + except WebSocketDisconnect: + await manager.mark_disconnect_failed(device_id) + await manager.unregister(device_id) diff --git a/video_worker/app/settings.py b/video_worker/app/settings.py index 0bdd069..34540a2 100644 --- a/video_worker/app/settings.py +++ b/video_worker/app/settings.py @@ -7,6 +7,12 @@ from pydantic_settings import BaseSettings, SettingsConfigDict class Settings(BaseSettings): app_host: str = Field(default="0.0.0.0", alias="APP_HOST") app_port: int = Field(default=8000, alias="APP_PORT") + ws_gateway_host: str = Field(default="0.0.0.0", alias="WS_GATEWAY_HOST") + ws_gateway_port: int = Field(default=8010, alias="WS_GATEWAY_PORT") + worker_base_url: str = Field(default="http://127.0.0.1:8000", alias="WORKER_BASE_URL") + ws_poll_interval_sec: float = Field(default=1.0, alias="WS_POLL_INTERVAL_SEC") + edge_dispatch_host: str = Field(default="0.0.0.0", alias="EDGE_DISPATCH_HOST") + edge_dispatch_port: int = Field(default=8020, alias="EDGE_DISPATCH_PORT") output_dir: Path = Field(default=Path("./outputs"), alias="OUTPUT_DIR") runtime_dir: Path = Field(default=Path("./runtime"), alias="RUNTIME_DIR") diff --git a/video_worker/app/ws_service.py b/video_worker/app/ws_service.py new file mode 100644 index 0000000..c0dad70 --- /dev/null +++ b/video_worker/app/ws_service.py @@ -0,0 +1,87 @@ +import asyncio +from typing import Any + +import requests +from fastapi import FastAPI, WebSocket, WebSocketDisconnect + +from app.schemas import GenerateRequest +from app.settings import settings + +app = FastAPI(title="Video Worker WS Gateway", version="0.1.0") + + +def _url(path: str) -> str: + return f"{settings.worker_base_url.rstrip('/')}{path}" + + +def _http_get(path: str) -> dict[str, Any]: + resp = requests.get(_url(path), timeout=20) + resp.raise_for_status() + return resp.json() + + +def _http_post(path: str, payload: dict[str, Any]) -> dict[str, Any]: + resp = requests.post(_url(path), json=payload, timeout=30) + resp.raise_for_status() + return resp.json() + + +@app.get("/health") +def health() -> dict[str, Any]: + gateway_status = "ok" + upstream_ok = False + upstream_error = None + try: + _http_get("/health") + upstream_ok = True + except Exception as exc: + upstream_error = str(exc) + return { + "service": "ws_gateway", + "status": gateway_status, + "worker_base_url": settings.worker_base_url, + "upstream_ok": upstream_ok, + "upstream_error": upstream_error, + } + + +@app.websocket("/ws/generate") +async def ws_generate(websocket: WebSocket) -> None: + await websocket.accept() + + try: + message = await websocket.receive_json() + action = message.get("action") + + if action == "watch": + task_id = message.get("task_id") + if not task_id: + await websocket.send_json({"event": "error", "error": "task_id is required for watch"}) + await websocket.close(code=1008) + return + else: + payload = message.get("payload", message) + req = GenerateRequest.model_validate(payload) + created = await asyncio.to_thread(_http_post, "/generate", req.model_dump()) + task_id = created["task_id"] + await websocket.send_json({"event": "accepted", **created}) + + last_status = None + while True: + status = await asyncio.to_thread(_http_get, f"/tasks/{task_id}") + if status.get("status") != last_status: + await websocket.send_json({"event": "status", **status}) + last_status = status.get("status") + + if status.get("status") in {"SUCCEEDED", "FAILED"}: + result = await asyncio.to_thread(_http_get, f"/tasks/{task_id}/result") + await websocket.send_json({"event": "result", **result}) + await websocket.close(code=1000) + return + + await asyncio.sleep(settings.ws_poll_interval_sec) + except WebSocketDisconnect: + return + except Exception as exc: + await websocket.send_json({"event": "error", "error": str(exc)}) + await websocket.close(code=1011) diff --git a/video_worker/docker-compose.edge-dispatch.yml b/video_worker/docker-compose.edge-dispatch.yml new file mode 100644 index 0000000..10c9567 --- /dev/null +++ b/video_worker/docker-compose.edge-dispatch.yml @@ -0,0 +1,15 @@ +services: + edge-dispatch: + build: + context: . + dockerfile: docker/edge-dispatch/Dockerfile + container_name: edge-dispatch-service + env_file: + - .env + environment: + EDGE_DISPATCH_HOST: 0.0.0.0 + EDGE_DISPATCH_PORT: ${EDGE_DISPATCH_PORT:-8020} + command: ["python", "-m", "uvicorn", "app.edge_dispatch_service:app", "--host", "0.0.0.0", "--port", "${EDGE_DISPATCH_PORT:-8020}"] + ports: + - "${EDGE_DISPATCH_PORT:-8020}:${EDGE_DISPATCH_PORT:-8020}" + restart: unless-stopped diff --git a/video_worker/docker/edge-dispatch/Dockerfile b/video_worker/docker/edge-dispatch/Dockerfile new file mode 100644 index 0000000..6219406 --- /dev/null +++ b/video_worker/docker/edge-dispatch/Dockerfile @@ -0,0 +1,15 @@ +FROM python:3.10-slim + +ENV PYTHONDONTWRITEBYTECODE=1 \ + PYTHONUNBUFFERED=1 + +WORKDIR /app + +COPY requirements.txt /app/requirements.txt +RUN pip install --no-cache-dir -r /app/requirements.txt + +COPY app /app/app + +EXPOSE 8020 + +CMD ["python", "-m", "uvicorn", "app.edge_dispatch_service:app", "--host", "0.0.0.0", "--port", "8020"] diff --git a/video_worker/requirements.txt b/video_worker/requirements.txt index cb6536b..06d1f01 100644 --- a/video_worker/requirements.txt +++ b/video_worker/requirements.txt @@ -10,3 +10,4 @@ accelerate==1.10.1 safetensors==0.6.2 Pillow==11.3.0 requests==2.32.4 +websockets==15.0.1 diff --git a/video_worker/scripts/edge_device_client.py b/video_worker/scripts/edge_device_client.py new file mode 100644 index 0000000..f3530f5 --- /dev/null +++ b/video_worker/scripts/edge_device_client.py @@ -0,0 +1,88 @@ +import asyncio +import json +import os +import sys +import time + +import requests +import websockets + +DISPATCH_WS_URL = os.getenv("DISPATCH_WS_URL", "ws://127.0.0.1:8020/ws/edge/edge-a4000-01") +WORKER_BASE_URL = os.getenv("WORKER_BASE_URL", "http://127.0.0.1:8000") +POLL_INTERVAL = float(os.getenv("EDGE_POLL_INTERVAL_SEC", "1.0")) + +if len(sys.argv) > 1: + DISPATCH_WS_URL = sys.argv[1] + + +def worker_post(path: str, payload: dict): + r = requests.post(f"{WORKER_BASE_URL}{path}", json=payload, timeout=30) + r.raise_for_status() + return r.json() + + +def worker_get(path: str): + r = requests.get(f"{WORKER_BASE_URL}{path}", timeout=20) + r.raise_for_status() + return r.json() + + +async def handle_generate(ws, data: dict): + dispatch_id = data["dispatch_id"] + req = data["request"] + + created = await asyncio.to_thread(worker_post, "/generate", req) + task_id = created["task_id"] + await ws.send(json.dumps({"event": "accepted", "dispatch_id": dispatch_id, "task_id": task_id}, ensure_ascii=False)) + + while True: + status = await asyncio.to_thread(worker_get, f"/tasks/{task_id}") + await ws.send( + json.dumps( + { + "event": "status", + "dispatch_id": dispatch_id, + "task_id": task_id, + "status": status["status"], + "progress": status.get("progress", 0.0), + }, + ensure_ascii=False, + ) + ) + if status["status"] in {"SUCCEEDED", "FAILED"}: + result = await asyncio.to_thread(worker_get, f"/tasks/{task_id}/result") + await ws.send( + json.dumps( + { + "event": "result", + "dispatch_id": dispatch_id, + "task_id": task_id, + **result, + }, + ensure_ascii=False, + ) + ) + return + await asyncio.sleep(POLL_INTERVAL) + + +async def main() -> None: + while True: + try: + async with websockets.connect(DISPATCH_WS_URL, max_size=2**22) as ws: + print(f"connected: {DISPATCH_WS_URL}") + while True: + raw = await ws.recv() + data = json.loads(raw) + event = data.get("event") + if event == "generate": + await handle_generate(ws, data) + elif event == "registered": + print("registered", data) + except Exception as exc: + print("connection error, retry in 3s:", exc) + time.sleep(3) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/video_worker/scripts/one_click_wsl.sh b/video_worker/scripts/one_click_wsl.sh new file mode 100644 index 0000000..f5043dc --- /dev/null +++ b/video_worker/scripts/one_click_wsl.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +cd "$ROOT_DIR" + +if ! command -v python3 >/dev/null 2>&1; then + echo "[ERROR] python3 not found in WSL environment" + exit 1 +fi + +if [ ! -d .venv ]; then + echo "[INFO] .venv not found, running install_wsl_env.sh" + bash scripts/install_wsl_env.sh +fi + +if [ ! -f .env ]; then + echo "[INFO] .env not found, copying from .env.example" + cp .env.example .env +fi + +echo "[INFO] launching server in WSL" +exec bash scripts/run_server.sh diff --git a/video_worker/scripts/run_edge_dispatch_docker.sh b/video_worker/scripts/run_edge_dispatch_docker.sh new file mode 100755 index 0000000..486ef49 --- /dev/null +++ b/video_worker/scripts/run_edge_dispatch_docker.sh @@ -0,0 +1,14 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +cd "$ROOT_DIR" + +if [ ! -f .env ]; then + cp .env.example .env +fi + +docker compose -f docker-compose.edge-dispatch.yml up -d --build + +echo "[OK] edge dispatch service started" +echo "health: curl http://127.0.0.1:${EDGE_DISPATCH_PORT:-8020}/health" diff --git a/video_worker/scripts/run_ws_service.bat b/video_worker/scripts/run_ws_service.bat new file mode 100644 index 0000000..ccb4e53 --- /dev/null +++ b/video_worker/scripts/run_ws_service.bat @@ -0,0 +1,18 @@ +@echo off +setlocal +cd /d %~dp0\.. +if not exist .venv ( + echo [ERROR] .venv not found, run install script first + exit /b 1 +) +if not exist .env ( + echo [ERROR] .env not found, copy from .env.example + exit /b 1 +) +call .venv\Scripts\activate.bat +for /f "usebackq tokens=1,* delims==" %%A in (".env") do ( + if not "%%A"=="" set "%%A=%%B" +) +if "%WS_GATEWAY_HOST%"=="" set WS_GATEWAY_HOST=0.0.0.0 +if "%WS_GATEWAY_PORT%"=="" set WS_GATEWAY_PORT=8010 +python -m uvicorn app.ws_service:app --host %WS_GATEWAY_HOST% --port %WS_GATEWAY_PORT% diff --git a/video_worker/scripts/run_ws_service.ps1 b/video_worker/scripts/run_ws_service.ps1 new file mode 100644 index 0000000..5c1abf3 --- /dev/null +++ b/video_worker/scripts/run_ws_service.ps1 @@ -0,0 +1,20 @@ +$ErrorActionPreference = "Stop" +$Root = Split-Path -Parent (Split-Path -Parent $MyInvocation.MyCommand.Path) +Set-Location $Root + +if (!(Test-Path .venv)) { throw ".venv not found, run install script first" } +if (!(Test-Path .env)) { throw ".env not found, copy from .env.example" } + +.\.venv\Scripts\Activate.ps1 +Get-Content .env | ForEach-Object { + if ($_ -match "^\s*#") { return } + if ($_ -match "^\s*$") { return } + $parts = $_ -split "=", 2 + if ($parts.Length -eq 2) { + [System.Environment]::SetEnvironmentVariable($parts[0], $parts[1], "Process") + } +} + +$hostValue = if ($env:WS_GATEWAY_HOST) { $env:WS_GATEWAY_HOST } else { "0.0.0.0" } +$portValue = if ($env:WS_GATEWAY_PORT) { $env:WS_GATEWAY_PORT } else { "8010" } +python -m uvicorn app.ws_service:app --host $hostValue --port $portValue diff --git a/video_worker/scripts/run_ws_service.sh b/video_worker/scripts/run_ws_service.sh new file mode 100755 index 0000000..881b0af --- /dev/null +++ b/video_worker/scripts/run_ws_service.sh @@ -0,0 +1,21 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +cd "$ROOT_DIR" + +if [ ! -d .venv ]; then + echo "[ERROR] .venv not found, run install script first" + exit 1 +fi +if [ ! -f .env ]; then + echo "[ERROR] .env not found, copy from .env.example" + exit 1 +fi + +source .venv/bin/activate +set -a +source .env +set +a + +python -m uvicorn app.ws_service:app --host "${WS_GATEWAY_HOST:-0.0.0.0}" --port "${WS_GATEWAY_PORT:-8010}" diff --git a/video_worker/scripts/run_wsl_one_click.ps1 b/video_worker/scripts/run_wsl_one_click.ps1 new file mode 100644 index 0000000..ff4cb5c --- /dev/null +++ b/video_worker/scripts/run_wsl_one_click.ps1 @@ -0,0 +1,31 @@ +param( + [string]$Distro = "" +) + +$ErrorActionPreference = "Stop" + +$Root = Split-Path -Parent (Split-Path -Parent $MyInvocation.MyCommand.Path) +Set-Location $Root + +if (!(Get-Command wsl -ErrorAction SilentlyContinue)) { + throw "WSL command not found. Please install WSL first." +} + +# Resolve Windows project path to the same distro that will run the server (mounts differ per distro). +if ([string]::IsNullOrWhiteSpace($Distro)) { + $linuxPath = (wsl -- wslpath -a "$Root").Trim() +} else { + $linuxPath = (wsl -d $Distro -- wslpath -a "$Root").Trim() +} +if ([string]::IsNullOrWhiteSpace($linuxPath)) { + throw "Failed to resolve WSL path for project root: $Root" +} + +# Single argument to bash -lc; quote the whole string so spaces in path are safe. +$bashCommand = "cd '$linuxPath' && chmod +x scripts/one_click_wsl.sh scripts/install_wsl_env.sh scripts/run_server.sh && exec bash scripts/one_click_wsl.sh" + +if ([string]::IsNullOrWhiteSpace($Distro)) { + wsl -- bash -lc "$bashCommand" +} else { + wsl -d $Distro -- bash -lc "$bashCommand" +} diff --git a/video_worker/scripts/stop_edge_dispatch_docker.sh b/video_worker/scripts/stop_edge_dispatch_docker.sh new file mode 100755 index 0000000..eba8e61 --- /dev/null +++ b/video_worker/scripts/stop_edge_dispatch_docker.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +cd "$ROOT_DIR" + +docker compose -f docker-compose.edge-dispatch.yml down diff --git a/video_worker/scripts/ws_smoke_test.py b/video_worker/scripts/ws_smoke_test.py new file mode 100644 index 0000000..4bdbbff --- /dev/null +++ b/video_worker/scripts/ws_smoke_test.py @@ -0,0 +1,37 @@ +import asyncio +import json +import sys + +import websockets + +WS_URL = sys.argv[1] if len(sys.argv) > 1 else "ws://127.0.0.1:8010/ws/generate" + + +async def main() -> None: + payload = { + "action": "generate", + "payload": { + "prompt": "a lonely man walking in a rainy neon street, cinematic, handheld camera", + "negative_prompt": "blurry, deformed face, extra limbs, flicker", + "quality_mode": "preview", + "duration_sec": 1, + "width": 320, + "height": 240, + "fps": 8, + "steps": 8, + "seed": 123456, + }, + } + + async with websockets.connect(WS_URL, max_size=2**22) as ws: + await ws.send(json.dumps(payload, ensure_ascii=False)) + while True: + msg = await ws.recv() + data = json.loads(msg) + print(json.dumps(data, ensure_ascii=False)) + if data.get("event") == "result": + break + + +if __name__ == "__main__": + asyncio.run(main())