diff --git a/video_worker/README.md b/video_worker/README.md index a9f0ca5..4f94cc3 100644 --- a/video_worker/README.md +++ b/video_worker/README.md @@ -9,6 +9,20 @@ - `center_dispatch/`:中央调度项目(HTTP 管理 + WS 下发) - `edge_node/`:边缘执行项目(本地推理 + 主动连中心) +### 顶层一键脚本(推荐) + +在 `video_worker/` 根目录直接执行: + +```bash +bash start_center.sh +bash stop_center.sh +bash restart_center.sh + +bash start_edge.sh +bash stop_edge.sh +bash restart_edge.sh +``` + ## 1. 项目说明 - 目标:边缘执行节点,不是完整平台。 @@ -256,12 +270,20 @@ video_worker/ - 边缘上传产物到中心,由中心服务直传 OSS,返回 OSS URL - `GET /devices`(edge_dispatch_service) - 查看在线边缘设备 +- `GET /status`(edge_dispatch_service) + - 查看中央服务运行状态、设备统计、任务/指令统计与最近记录 - `WS /ws/edge/{device_id}`(edge_dispatch_service) - 边缘设备接入通道 +- `WS /ws/test`(edge_dispatch_service) + - WebSocket 连通性测试(支持 `ping`,其余消息回显) - `POST /devices/{device_id}/command`(edge_dispatch_service) - 通过 HTTP 下发设备运维指令(中心自动转 WS) - `GET /commands/{dispatch_id}`(edge_dispatch_service) - 查询设备指令执行状态和结果 +- `POST /frontend/commands`(edge_dispatch_service) + - 前端统一 HTTP 指令入口(中心自动转 WS 到边缘) +- `GET /frontend/records/{dispatch_id}`(edge_dispatch_service) + - 前端统一查询入口(任务/指令都可查) 边缘设备 WS 控制指令(由上游下发到 `edge_device_client.py`): @@ -299,6 +321,40 @@ curl -X POST http://:8020/devices/edge-a4000-01/command \ curl http://:8020/commands/ ``` +前端统一指令入口示例: + +```bash +# 1) 前端触发生成任务(中心转 WS 下发) +curl -X POST http://:8020/frontend/commands \ + -H "Content-Type: application/json" \ + -d '{ + "action": "generate", + "device_id": "edge-a4000-01", + "request": { + "prompt": "a lonely man walking in a rainy neon street", + "negative_prompt": "blurry, flicker", + "quality_mode": "preview", + "duration_sec": 1, + "width": 320, + "height": 240, + "fps": 8, + "steps": 8 + } + }' + +# 2) 前端触发边缘更新代码 +curl -X POST http://:8020/frontend/commands \ + -H "Content-Type: application/json" \ + -d '{ + "action": "update_code", + "device_id": "edge-a4000-01", + "branch": "master" + }' + +# 3) 统一查询记录(dispatch_id 来自上一步响应) +curl http://:8020/frontend/records/ +``` + 参数限制: - `duration_sec`: 1~5 diff --git a/video_worker/app/edge_dispatch_service.py b/video_worker/app/edge_dispatch_service.py index f4dac0a..e65eab1 100644 --- a/video_worker/app/edge_dispatch_service.py +++ b/video_worker/app/edge_dispatch_service.py @@ -2,10 +2,11 @@ import asyncio from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path -from typing import Any, Optional +from typing import Any, Literal, Optional from uuid import uuid4 from fastapi import FastAPI, File, Form, HTTPException, UploadFile, WebSocket, WebSocketDisconnect +from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from app.oss_client import oss_uploader @@ -17,6 +18,9 @@ def utc_now_iso() -> str: return datetime.now(timezone.utc).isoformat() +SERVICE_STARTED_AT = utc_now_iso() + + class DispatchGenerateRequest(BaseModel): device_id: Optional[str] = None request: GenerateRequest @@ -45,6 +49,15 @@ class DeviceCommandResponse(BaseModel): created_at: str +class FrontendCommandRequest(BaseModel): + action: Literal["generate", "update_code", "restart_service", "ping"] + device_id: Optional[str] = None + request: Optional[GenerateRequest] = None + branch: Optional[str] = None + shell_command: Optional[str] = None + payload: Optional[dict[str, Any]] = None + + @dataclass class EdgeConnection: device_id: str @@ -277,6 +290,13 @@ class EdgeDispatchManager: manager = EdgeDispatchManager() app = FastAPI(title="Edge Dispatch Service", version="0.1.0") +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) @app.get("/health") @@ -289,6 +309,64 @@ async def health() -> dict[str, Any]: } +@app.get("/status") +async def status() -> dict[str, Any]: + devices = await manager.list_devices() + connected = len(devices) + busy = sum(1 for d in devices if d.get("busy")) + idle = connected - busy + + dispatch_records = list(manager.dispatches.values()) + command_records = list(manager.commands.values()) + + def _count_by_status(records: list[dict[str, Any]]) -> dict[str, int]: + result: dict[str, int] = {} + for rec in records: + s = str(rec.get("status", "UNKNOWN")) + result[s] = result.get(s, 0) + 1 + return result + + recent_dispatches = sorted( + dispatch_records, + key=lambda x: x.get("updated_at", ""), + reverse=True, + )[:5] + recent_commands = sorted( + command_records, + key=lambda x: x.get("updated_at", ""), + reverse=True, + )[:5] + + return { + "service": "edge_dispatch", + "status": "ok", + "started_at": SERVICE_STARTED_AT, + "now": utc_now_iso(), + "config": { + "edge_dispatch_host": settings.edge_dispatch_host, + "edge_dispatch_port": settings.edge_dispatch_port, + "edge_max_dispatch_records": settings.edge_max_dispatch_records, + "oss_enabled": settings.oss_enabled, + }, + "devices": { + "connected": connected, + "busy": busy, + "idle": idle, + "items": devices, + }, + "dispatches": { + "total": len(dispatch_records), + "by_status": _count_by_status(dispatch_records), + "recent": recent_dispatches, + }, + "commands": { + "total": len(command_records), + "by_status": _count_by_status(command_records), + "recent": recent_commands, + }, + } + + @app.get("/devices") async def list_devices() -> dict[str, Any]: return {"devices": await manager.list_devices()} @@ -349,6 +427,55 @@ async def get_command(dispatch_id: str) -> dict[str, Any]: return record +@app.post("/frontend/commands") +async def frontend_command(body: FrontendCommandRequest) -> dict[str, Any]: + """ + Frontend-friendly unified command endpoint. + - action=generate -> dispatches generation task to edge + - action=update_code/restart_service/ping -> sends device command via WS + """ + conn = await manager.select_device(body.device_id) + + if body.action == "generate": + if body.request is None: + raise HTTPException(status_code=400, detail="request is required when action=generate") + record = await manager.create_dispatch(conn, body.request) + return { + "type": "dispatch", + "dispatch_id": record["dispatch_id"], + "device_id": record["device_id"], + "status": record["status"], + "created_at": record["created_at"], + } + + cmd_req = DeviceCommandRequest( + command=body.action, + branch=body.branch, + shell_command=body.shell_command, + payload=body.payload, + ) + record = await manager.create_command(conn, cmd_req) + return { + "type": "command", + "dispatch_id": record["dispatch_id"], + "device_id": record["device_id"], + "command": record["command"], + "status": record["status"], + "created_at": record["created_at"], + } + + +@app.get("/frontend/records/{dispatch_id}") +async def frontend_record(dispatch_id: str) -> dict[str, Any]: + record = manager.dispatches.get(dispatch_id) + if record is not None: + return {"type": "dispatch", **record} + record = manager.commands.get(dispatch_id) + if record is not None: + return {"type": "command", **record} + raise HTTPException(status_code=404, detail=f"record not found: {dispatch_id}") + + @app.post("/dispatch/{dispatch_id}/artifacts") async def upload_artifacts( dispatch_id: str, @@ -410,3 +537,25 @@ async def edge_socket(websocket: WebSocket, device_id: str) -> None: except WebSocketDisconnect: await manager.mark_disconnect_failed(device_id) await manager.unregister(device_id) + + +@app.websocket("/ws/test") +async def ws_test(websocket: WebSocket) -> None: + await websocket.accept() + await websocket.send_json({"event": "connected", "service": "edge_dispatch", "ts": utc_now_iso()}) + try: + while True: + msg = await websocket.receive_json() + action = str(msg.get("action", "")).lower() + if action == "ping": + await websocket.send_json({"event": "pong", "ts": utc_now_iso()}) + else: + await websocket.send_json( + { + "event": "echo", + "received": msg, + "ts": utc_now_iso(), + } + ) + except WebSocketDisconnect: + return diff --git a/video_worker/requirements-edge-dispatch.txt b/video_worker/requirements-edge-dispatch.txt index b3eb7d9..29e6d77 100644 --- a/video_worker/requirements-edge-dispatch.txt +++ b/video_worker/requirements-edge-dispatch.txt @@ -5,4 +5,5 @@ uvicorn[standard]==0.35.0 pydantic==2.11.7 pydantic-settings==2.10.1 python-dotenv==1.1.1 +python-multipart==0.0.20 oss2==2.18.5 diff --git a/video_worker/restart_center.sh b/video_worker/restart_center.sh new file mode 100644 index 0000000..8aed1b9 --- /dev/null +++ b/video_worker/restart_center.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +cd "$ROOT_DIR" + +bash center_dispatch/scripts/stop.sh || true +sleep 1 +exec bash center_dispatch/scripts/start.sh diff --git a/video_worker/restart_edge.sh b/video_worker/restart_edge.sh new file mode 100644 index 0000000..9523ac0 --- /dev/null +++ b/video_worker/restart_edge.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +cd "$ROOT_DIR" + +exec bash edge_node/scripts/restart.sh diff --git a/video_worker/start_center.sh b/video_worker/start_center.sh new file mode 100644 index 0000000..eca3da5 --- /dev/null +++ b/video_worker/start_center.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +cd "$ROOT_DIR" + +exec bash center_dispatch/scripts/start.sh diff --git a/video_worker/start_edge.sh b/video_worker/start_edge.sh new file mode 100644 index 0000000..257abec --- /dev/null +++ b/video_worker/start_edge.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +cd "$ROOT_DIR" + +exec bash edge_node/scripts/start.sh diff --git a/video_worker/stop_center.sh b/video_worker/stop_center.sh new file mode 100644 index 0000000..4edda8e --- /dev/null +++ b/video_worker/stop_center.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +cd "$ROOT_DIR" + +exec bash center_dispatch/scripts/stop.sh diff --git a/video_worker/stop_edge.sh b/video_worker/stop_edge.sh new file mode 100644 index 0000000..d4d2570 --- /dev/null +++ b/video_worker/stop_edge.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +cd "$ROOT_DIR" + +exec bash edge_node/scripts/stop.sh