diff --git a/video_worker/.env.example b/video_worker/.env.example index aeaa03c..f258dfa 100644 --- a/video_worker/.env.example +++ b/video_worker/.env.example @@ -6,6 +6,8 @@ WORKER_BASE_URL=http://127.0.0.1:8000 WS_POLL_INTERVAL_SEC=1.0 DISPATCH_WS_URL=ws://127.0.0.1:8060/ws/edge/edge-a4000-01 EDGE_POLL_INTERVAL_SEC=1.0 +EDGE_ALLOW_REMOTE_UPDATE=true +EDGE_ALLOW_REMOTE_RESTART=true EDGE_DISPATCH_HOST=0.0.0.0 EDGE_DISPATCH_PORT=8020 EDGE_MAX_DISPATCH_RECORDS=2000 diff --git a/video_worker/README.md b/video_worker/README.md index 953ef0c..dc6ed78 100644 --- a/video_worker/README.md +++ b/video_worker/README.md @@ -76,6 +76,34 @@ cd video_worker bash scripts/start_edge_device_local.sh ``` +Windows + WSL 一键边缘设备启动(推荐): + +```powershell +cd video_worker +.\scripts\edge_device_wsl.ps1 -Action start +``` + +常用操作: + +```powershell +.\scripts\edge_device_wsl.ps1 -Action status +.\scripts\edge_device_wsl.ps1 -Action restart +.\scripts\edge_device_wsl.ps1 -Action stop +``` + +多发行版时可指定: + +```powershell +.\scripts\edge_device_wsl.ps1 -Action start -Distro Ubuntu-22.04 +``` + +边缘设备停止 / 重启: + +```bash +bash scripts/stop_edge_device_local.sh +bash scripts/restart_edge_device_local.sh +``` + 独立 WS 网关服务(远程推荐): ```bash @@ -223,6 +251,46 @@ video_worker/ - 查看在线边缘设备 - `WS /ws/edge/{device_id}`(edge_dispatch_service) - 边缘设备接入通道 +- `POST /devices/{device_id}/command`(edge_dispatch_service) + - 通过 HTTP 下发设备运维指令(中心自动转 WS) +- `GET /commands/{dispatch_id}`(edge_dispatch_service) + - 查询设备指令执行状态和结果 + +边缘设备 WS 控制指令(由上游下发到 `edge_device_client.py`): + +- `generate`: 下发生成任务 +- `update_code`: 拉取最新代码(默认执行 `git fetch --all && git checkout && git pull --ff-only origin `,可通过 `command` 自定义) +- `restart_service`: 执行边缘本地重启脚本 +- `ping`: 心跳探活,设备回 `pong` + +HTTP 下发设备指令示例(推荐上游系统使用): + +```bash +# 1) 更新边缘设备代码 +curl -X POST http://:8020/devices/edge-a4000-01/command \ + -H "Content-Type: application/json" \ + -d '{ + "command": "update_code", + "branch": "master" + }' + +# 2) 重启边缘设备服务 +curl -X POST http://:8020/devices/edge-a4000-01/command \ + -H "Content-Type: application/json" \ + -d '{ + "command": "restart_service" + }' + +# 3) 设备心跳检查 +curl -X POST http://:8020/devices/edge-a4000-01/command \ + -H "Content-Type: application/json" \ + -d '{ + "command": "ping" + }' + +# 4) 查询指令执行状态(使用上一步返回的 dispatch_id) +curl http://:8020/commands/ +``` 参数限制: diff --git a/video_worker/app/edge_dispatch_service.py b/video_worker/app/edge_dispatch_service.py index c98ff02..f4dac0a 100644 --- a/video_worker/app/edge_dispatch_service.py +++ b/video_worker/app/edge_dispatch_service.py @@ -29,6 +29,22 @@ class DispatchResponse(BaseModel): created_at: str +class DeviceCommandRequest(BaseModel): + command: str + dispatch_id: Optional[str] = None + branch: Optional[str] = None + shell_command: Optional[str] = None + payload: Optional[dict[str, Any]] = None + + +class DeviceCommandResponse(BaseModel): + dispatch_id: str + device_id: str + command: str + status: str + created_at: str + + @dataclass class EdgeConnection: device_id: str @@ -42,6 +58,7 @@ class EdgeDispatchManager: def __init__(self) -> None: self.connections: dict[str, EdgeConnection] = {} self.dispatches: dict[str, dict[str, Any]] = {} + self.commands: dict[str, dict[str, Any]] = {} self.lock = asyncio.Lock() async def register(self, device_id: str, websocket: WebSocket) -> EdgeConnection: @@ -130,17 +147,68 @@ class EdgeDispatchManager: raise return record + async def create_command(self, conn: EdgeConnection, body: DeviceCommandRequest) -> dict[str, Any]: + await self._prune_if_needed() + command = body.command.strip().lower() + allowed = {"update_code", "restart_service", "ping"} + if command not in allowed: + raise HTTPException(status_code=400, detail=f"unsupported command: {command}") + + dispatch_id = body.dispatch_id or uuid4().hex + now = utc_now_iso() + record = { + "dispatch_id": dispatch_id, + "device_id": conn.device_id, + "command": command, + "status": "DISPATCHED", + "created_at": now, + "updated_at": now, + "request": body.model_dump(), + "result": None, + "error": None, + } + + payload: dict[str, Any] = { + "event": command, + "dispatch_id": dispatch_id, + } + if body.branch: + payload["branch"] = body.branch + if body.shell_command: + payload["command"] = body.shell_command + if body.payload: + payload.update(body.payload) + + async with self.lock: + self.commands[dispatch_id] = record + target = self.connections.get(conn.device_id) + if target is None: + raise HTTPException(status_code=404, detail=f"device disconnected: {conn.device_id}") + target.last_seen = now + + try: + await conn.websocket.send_json(payload) + except Exception as exc: + async with self.lock: + record["status"] = "FAILED" + record["error"] = f"command send failed: {exc}" + record["updated_at"] = utc_now_iso() + raise + return record + async def _prune_if_needed(self) -> None: max_records = max(100, int(settings.edge_max_dispatch_records)) async with self.lock: - total = len(self.dispatches) + total = len(self.dispatches) + len(self.commands) if total < max_records: return over = total - max_records + 1 done = [v for v in self.dispatches.values() if v.get("status") in {"SUCCEEDED", "FAILED"}] + done.extend([v for v in self.commands.values() if v.get("status") in {"SUCCEEDED", "FAILED"}]) done.sort(key=lambda x: x.get("updated_at", "")) for rec in done[:over]: self.dispatches.pop(rec["dispatch_id"], None) + self.commands.pop(rec["dispatch_id"], None) async def mark_event(self, device_id: str, event: dict[str, Any]) -> None: now = utc_now_iso() @@ -154,6 +222,8 @@ class EdgeDispatchManager: return record = self.dispatches.get(dispatch_id) + if record is None: + record = self.commands.get(dispatch_id) if record is None: return @@ -175,6 +245,18 @@ class EdgeDispatchManager: record["error"] = event.get("error") if conn: conn.busy = False + elif evt == "command_status": + status_val = event.get("status") or "RUNNING" + record["status"] = status_val + elif evt == "command_result": + status_val = event.get("status") or "SUCCEEDED" + record["status"] = status_val + record["result"] = event + if status_val == "FAILED": + record["error"] = event.get("error") or event.get("stderr") + elif evt == "pong": + record["status"] = "SUCCEEDED" + record["result"] = event record["updated_at"] = now @@ -186,6 +268,11 @@ class EdgeDispatchManager: record["status"] = "FAILED" record["error"] = f"device disconnected: {device_id}" record["updated_at"] = now + for record in self.commands.values(): + if record["device_id"] == device_id and record["status"] in {"DISPATCHED", "RUNNING", "PENDING"}: + record["status"] = "FAILED" + record["error"] = f"device disconnected: {device_id}" + record["updated_at"] = now manager = EdgeDispatchManager() @@ -234,6 +321,34 @@ async def get_dispatch(dispatch_id: str) -> dict[str, Any]: return record +@app.post("/devices/{device_id}/command", response_model=DeviceCommandResponse) +async def device_command(device_id: str, body: DeviceCommandRequest) -> DeviceCommandResponse: + conn = await manager.select_device(device_id) + try: + record = await manager.create_command(conn, body) + except WebSocketDisconnect as exc: + await manager.unregister(conn.device_id) + raise HTTPException(status_code=503, detail=f"device disconnected during command: {conn.device_id}") from exc + except RuntimeError as exc: + raise HTTPException(status_code=503, detail=str(exc)) from exc + + return DeviceCommandResponse( + dispatch_id=record["dispatch_id"], + device_id=record["device_id"], + command=record["command"], + status=record["status"], + created_at=record["created_at"], + ) + + +@app.get("/commands/{dispatch_id}") +async def get_command(dispatch_id: str) -> dict[str, Any]: + record = manager.commands.get(dispatch_id) + if record is None: + raise HTTPException(status_code=404, detail=f"command not found: {dispatch_id}") + return record + + @app.post("/dispatch/{dispatch_id}/artifacts") async def upload_artifacts( dispatch_id: str, diff --git a/video_worker/scripts/edge_device_client.py b/video_worker/scripts/edge_device_client.py index 999a3d6..da4859b 100644 --- a/video_worker/scripts/edge_device_client.py +++ b/video_worker/scripts/edge_device_client.py @@ -2,6 +2,7 @@ import asyncio import json import os from pathlib import Path +import subprocess import sys import time from urllib.parse import urlparse @@ -13,6 +14,12 @@ DISPATCH_WS_URL = os.getenv("DISPATCH_WS_URL", "ws://127.0.0.1:8020/ws/edge/edge WORKER_BASE_URL = os.getenv("WORKER_BASE_URL", "http://127.0.0.1:8000") POLL_INTERVAL = float(os.getenv("EDGE_POLL_INTERVAL_SEC", "1.0")) DISPATCH_HTTP_BASE = os.getenv("DISPATCH_HTTP_BASE", "") +ALLOW_REMOTE_UPDATE = os.getenv("EDGE_ALLOW_REMOTE_UPDATE", "true").lower() in {"1", "true", "yes", "on"} +ALLOW_REMOTE_RESTART = os.getenv("EDGE_ALLOW_REMOTE_RESTART", "true").lower() in {"1", "true", "yes", "on"} + +SCRIPT_DIR = Path(__file__).resolve().parent +ROOT_DIR = SCRIPT_DIR.parent +RESTART_SCRIPT = ROOT_DIR / "scripts" / "restart_edge_device_local.sh" if len(sys.argv) > 1: DISPATCH_WS_URL = sys.argv[1] @@ -41,6 +48,18 @@ def worker_get(path: str): return r.json() +def run_shell(command: str, timeout_sec: int = 1200) -> tuple[int, str, str]: + proc = subprocess.run( + command, + cwd=str(ROOT_DIR), + shell=True, + text=True, + capture_output=True, + timeout=timeout_sec, + ) + return proc.returncode, proc.stdout.strip(), proc.stderr.strip() + + def upload_artifacts(dispatch_id: str, task_id: str, result: dict) -> dict: candidate_fields = ["video_path", "first_frame_path", "metadata_path", "log_path"] existing_paths = [] @@ -78,6 +97,12 @@ def upload_artifacts(dispatch_id: str, task_id: str, result: dict) -> dict: fh.close() +def _short_text(text: str, limit: int = 1500) -> str: + if len(text) <= limit: + return text + return text[:limit] + "...(truncated)" + + async def handle_generate(ws, data: dict): dispatch_id = data["dispatch_id"] req = data["request"] @@ -114,13 +139,126 @@ async def handle_generate(ws, data: dict): result_payload["artifact_urls"] = artifact_urls else: result_payload["error"] = result.get("error") - await ws.send( - json.dumps(result_payload, ensure_ascii=False) - ) + await ws.send(json.dumps(result_payload, ensure_ascii=False)) return await asyncio.sleep(POLL_INTERVAL) +async def handle_update_code(ws, data: dict): + dispatch_id = data.get("dispatch_id", "") + if not ALLOW_REMOTE_UPDATE: + await ws.send( + json.dumps( + { + "event": "command_result", + "dispatch_id": dispatch_id, + "command": "update_code", + "status": "FAILED", + "error": "EDGE_ALLOW_REMOTE_UPDATE=false", + }, + ensure_ascii=False, + ) + ) + return + + branch = data.get("branch", "master") + git_command = data.get("command") or f"git fetch --all && git checkout {branch} && git pull --ff-only origin {branch}" + + await ws.send( + json.dumps( + {"event": "command_status", "dispatch_id": dispatch_id, "command": "update_code", "status": "RUNNING"}, + ensure_ascii=False, + ) + ) + + code, out, err = await asyncio.to_thread(run_shell, git_command, 1800) + payload = { + "event": "command_result", + "dispatch_id": dispatch_id, + "command": "update_code", + "status": "SUCCEEDED" if code == 0 else "FAILED", + "exit_code": code, + "stdout": _short_text(out), + "stderr": _short_text(err), + } + await ws.send(json.dumps(payload, ensure_ascii=False)) + + +async def handle_restart_service(ws, data: dict): + dispatch_id = data.get("dispatch_id", "") + if not ALLOW_REMOTE_RESTART: + await ws.send( + json.dumps( + { + "event": "command_result", + "dispatch_id": dispatch_id, + "command": "restart_service", + "status": "FAILED", + "error": "EDGE_ALLOW_REMOTE_RESTART=false", + }, + ensure_ascii=False, + ) + ) + return + + if not RESTART_SCRIPT.exists(): + await ws.send( + json.dumps( + { + "event": "command_result", + "dispatch_id": dispatch_id, + "command": "restart_service", + "status": "FAILED", + "error": f"restart script missing: {RESTART_SCRIPT}", + }, + ensure_ascii=False, + ) + ) + return + + await ws.send( + json.dumps( + {"event": "command_status", "dispatch_id": dispatch_id, "command": "restart_service", "status": "RUNNING"}, + ensure_ascii=False, + ) + ) + await ws.send( + json.dumps( + { + "event": "command_result", + "dispatch_id": dispatch_id, + "command": "restart_service", + "status": "SUCCEEDED", + "message": "restart script launched", + }, + ensure_ascii=False, + ) + ) + + subprocess.Popen( + ["bash", str(RESTART_SCRIPT)], + cwd=str(ROOT_DIR), + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + start_new_session=True, + ) + raise SystemExit(0) + + +async def handle_ping(ws, data: dict): + await ws.send( + json.dumps( + { + "event": "pong", + "dispatch_id": data.get("dispatch_id", ""), + "status": "ok", + "worker_base_url": WORKER_BASE_URL, + }, + ensure_ascii=False, + ) + ) + + async def main() -> None: while True: try: @@ -132,8 +270,16 @@ async def main() -> None: event = data.get("event") if event == "generate": await handle_generate(ws, data) + elif event == "update_code": + await handle_update_code(ws, data) + elif event == "restart_service": + await handle_restart_service(ws, data) + elif event == "ping": + await handle_ping(ws, data) elif event == "registered": print("registered", data) + except SystemExit: + return except Exception as exc: print("connection error, retry in 3s:", exc) time.sleep(3) diff --git a/video_worker/scripts/edge_device_wsl.ps1 b/video_worker/scripts/edge_device_wsl.ps1 new file mode 100644 index 0000000..9b1d571 --- /dev/null +++ b/video_worker/scripts/edge_device_wsl.ps1 @@ -0,0 +1,54 @@ +param( + [ValidateSet("start", "stop", "restart", "status")] + [string]$Action = "start", + [string]$Distro = "" +) + +$ErrorActionPreference = "Stop" + +$Root = Split-Path -Parent (Split-Path -Parent $MyInvocation.MyCommand.Path) +Set-Location $Root + +if (!(Get-Command wsl -ErrorAction SilentlyContinue)) { + throw "WSL command not found. Please install WSL first." +} + +if ([string]::IsNullOrWhiteSpace($Distro)) { + $linuxPath = (wsl -- wslpath -a "$Root").Trim() +} else { + $linuxPath = (wsl -d $Distro -- wslpath -a "$Root").Trim() +} +if ([string]::IsNullOrWhiteSpace($linuxPath)) { + throw "Failed to resolve WSL path for project root: $Root" +} + +switch ($Action) { + "start" { $targetScript = "scripts/start_edge_device_local.sh" } + "stop" { $targetScript = "scripts/stop_edge_device_local.sh" } + "restart" { $targetScript = "scripts/restart_edge_device_local.sh" } + "status" { $targetScript = "" } +} + +if ($Action -eq "status") { + $bashCommand = @" +cd '$linuxPath' && \ +if [ -f runtime/pids/worker.pid ] && kill -0 `$(cat runtime/pids/worker.pid) >/dev/null 2>&1; then + echo "[OK] worker running pid=`$(cat runtime/pids/worker.pid)" +else + echo "[INFO] worker not running" +fi && \ +if [ -f runtime/pids/edge_client.pid ] && kill -0 `$(cat runtime/pids/edge_client.pid) >/dev/null 2>&1; then + echo "[OK] edge_client running pid=`$(cat runtime/pids/edge_client.pid)" +else + echo "[INFO] edge_client not running" +fi +"@ +} else { + $bashCommand = "cd '$linuxPath' && chmod +x scripts/install_wsl_env.sh scripts/start_edge_device_local.sh scripts/stop_edge_device_local.sh scripts/restart_edge_device_local.sh && bash $targetScript" +} + +if ([string]::IsNullOrWhiteSpace($Distro)) { + wsl -- bash -lc "$bashCommand" +} else { + wsl -d $Distro -- bash -lc "$bashCommand" +} diff --git a/video_worker/scripts/restart_edge_device_local.sh b/video_worker/scripts/restart_edge_device_local.sh new file mode 100644 index 0000000..981485a --- /dev/null +++ b/video_worker/scripts/restart_edge_device_local.sh @@ -0,0 +1,9 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +cd "$ROOT_DIR" + +bash scripts/stop_edge_device_local.sh +sleep 1 +bash scripts/start_edge_device_local.sh diff --git a/video_worker/scripts/stop_edge_device_local.sh b/video_worker/scripts/stop_edge_device_local.sh new file mode 100644 index 0000000..6c49253 --- /dev/null +++ b/video_worker/scripts/stop_edge_device_local.sh @@ -0,0 +1,41 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +cd "$ROOT_DIR" + +PID_DIR="runtime/pids" +WORKER_PID_FILE="${PID_DIR}/worker.pid" +EDGE_CLIENT_PID_FILE="${PID_DIR}/edge_client.pid" + +stop_by_pid_file() { + local name="$1" + local pid_file="$2" + if [ ! -f "$pid_file" ]; then + echo "[INFO] ${name} not running (no pid file)" + return + fi + + local pid + pid="$(cat "$pid_file" 2>/dev/null || true)" + if [ -z "$pid" ]; then + rm -f "$pid_file" + echo "[INFO] ${name} pid file empty, cleaned" + return + fi + + if kill -0 "$pid" >/dev/null 2>&1; then + kill "$pid" >/dev/null 2>&1 || true + sleep 1 + if kill -0 "$pid" >/dev/null 2>&1; then + kill -9 "$pid" >/dev/null 2>&1 || true + fi + echo "[OK] ${name} stopped (pid ${pid})" + else + echo "[INFO] ${name} already stopped (stale pid ${pid})" + fi + rm -f "$pid_file" +} + +stop_by_pid_file "edge client" "$EDGE_CLIENT_PID_FILE" +stop_by_pid_file "worker" "$WORKER_PID_FILE"