fix: 修复bug和移动脚本

This commit is contained in:
Daniel
2026-04-07 18:14:42 +08:00
parent 5ae8d50298
commit 29195166fa
9 changed files with 251 additions and 1 deletions

View File

@@ -9,6 +9,20 @@
- `center_dispatch/`中央调度项目HTTP 管理 + WS 下发) - `center_dispatch/`中央调度项目HTTP 管理 + WS 下发)
- `edge_node/`:边缘执行项目(本地推理 + 主动连中心) - `edge_node/`:边缘执行项目(本地推理 + 主动连中心)
### 顶层一键脚本(推荐)
`video_worker/` 根目录直接执行:
```bash
bash start_center.sh
bash stop_center.sh
bash restart_center.sh
bash start_edge.sh
bash stop_edge.sh
bash restart_edge.sh
```
## 1. 项目说明 ## 1. 项目说明
- 目标:边缘执行节点,不是完整平台。 - 目标:边缘执行节点,不是完整平台。
@@ -256,12 +270,20 @@ video_worker/
- 边缘上传产物到中心,由中心服务直传 OSS返回 OSS URL - 边缘上传产物到中心,由中心服务直传 OSS返回 OSS URL
- `GET /devices`edge_dispatch_service - `GET /devices`edge_dispatch_service
- 查看在线边缘设备 - 查看在线边缘设备
- `GET /status`edge_dispatch_service
- 查看中央服务运行状态、设备统计、任务/指令统计与最近记录
- `WS /ws/edge/{device_id}`edge_dispatch_service - `WS /ws/edge/{device_id}`edge_dispatch_service
- 边缘设备接入通道 - 边缘设备接入通道
- `WS /ws/test`edge_dispatch_service
- WebSocket 连通性测试(支持 `ping`,其余消息回显)
- `POST /devices/{device_id}/command`edge_dispatch_service - `POST /devices/{device_id}/command`edge_dispatch_service
- 通过 HTTP 下发设备运维指令(中心自动转 WS - 通过 HTTP 下发设备运维指令(中心自动转 WS
- `GET /commands/{dispatch_id}`edge_dispatch_service - `GET /commands/{dispatch_id}`edge_dispatch_service
- 查询设备指令执行状态和结果 - 查询设备指令执行状态和结果
- `POST /frontend/commands`edge_dispatch_service
- 前端统一 HTTP 指令入口(中心自动转 WS 到边缘)
- `GET /frontend/records/{dispatch_id}`edge_dispatch_service
- 前端统一查询入口(任务/指令都可查)
边缘设备 WS 控制指令(由上游下发到 `edge_device_client.py` 边缘设备 WS 控制指令(由上游下发到 `edge_device_client.py`
@@ -299,6 +321,40 @@ curl -X POST http://<dispatch-host>:8020/devices/edge-a4000-01/command \
curl http://<dispatch-host>:8020/commands/<dispatch_id> curl http://<dispatch-host>:8020/commands/<dispatch_id>
``` ```
前端统一指令入口示例:
```bash
# 1) 前端触发生成任务(中心转 WS 下发)
curl -X POST http://<dispatch-host>:8020/frontend/commands \
-H "Content-Type: application/json" \
-d '{
"action": "generate",
"device_id": "edge-a4000-01",
"request": {
"prompt": "a lonely man walking in a rainy neon street",
"negative_prompt": "blurry, flicker",
"quality_mode": "preview",
"duration_sec": 1,
"width": 320,
"height": 240,
"fps": 8,
"steps": 8
}
}'
# 2) 前端触发边缘更新代码
curl -X POST http://<dispatch-host>:8020/frontend/commands \
-H "Content-Type: application/json" \
-d '{
"action": "update_code",
"device_id": "edge-a4000-01",
"branch": "master"
}'
# 3) 统一查询记录dispatch_id 来自上一步响应)
curl http://<dispatch-host>:8020/frontend/records/<dispatch_id>
```
参数限制: 参数限制:
- `duration_sec`: 1~5 - `duration_sec`: 1~5

View File

@@ -2,10 +2,11 @@ import asyncio
from dataclasses import dataclass from dataclasses import dataclass
from datetime import datetime, timezone from datetime import datetime, timezone
from pathlib import Path from pathlib import Path
from typing import Any, Optional from typing import Any, Literal, Optional
from uuid import uuid4 from uuid import uuid4
from fastapi import FastAPI, File, Form, HTTPException, UploadFile, WebSocket, WebSocketDisconnect from fastapi import FastAPI, File, Form, HTTPException, UploadFile, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel from pydantic import BaseModel
from app.oss_client import oss_uploader from app.oss_client import oss_uploader
@@ -17,6 +18,9 @@ def utc_now_iso() -> str:
return datetime.now(timezone.utc).isoformat() return datetime.now(timezone.utc).isoformat()
SERVICE_STARTED_AT = utc_now_iso()
class DispatchGenerateRequest(BaseModel): class DispatchGenerateRequest(BaseModel):
device_id: Optional[str] = None device_id: Optional[str] = None
request: GenerateRequest request: GenerateRequest
@@ -45,6 +49,15 @@ class DeviceCommandResponse(BaseModel):
created_at: str created_at: str
class FrontendCommandRequest(BaseModel):
action: Literal["generate", "update_code", "restart_service", "ping"]
device_id: Optional[str] = None
request: Optional[GenerateRequest] = None
branch: Optional[str] = None
shell_command: Optional[str] = None
payload: Optional[dict[str, Any]] = None
@dataclass @dataclass
class EdgeConnection: class EdgeConnection:
device_id: str device_id: str
@@ -277,6 +290,13 @@ class EdgeDispatchManager:
manager = EdgeDispatchManager() manager = EdgeDispatchManager()
app = FastAPI(title="Edge Dispatch Service", version="0.1.0") app = FastAPI(title="Edge Dispatch Service", version="0.1.0")
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/health") @app.get("/health")
@@ -289,6 +309,64 @@ async def health() -> dict[str, Any]:
} }
@app.get("/status")
async def status() -> dict[str, Any]:
devices = await manager.list_devices()
connected = len(devices)
busy = sum(1 for d in devices if d.get("busy"))
idle = connected - busy
dispatch_records = list(manager.dispatches.values())
command_records = list(manager.commands.values())
def _count_by_status(records: list[dict[str, Any]]) -> dict[str, int]:
result: dict[str, int] = {}
for rec in records:
s = str(rec.get("status", "UNKNOWN"))
result[s] = result.get(s, 0) + 1
return result
recent_dispatches = sorted(
dispatch_records,
key=lambda x: x.get("updated_at", ""),
reverse=True,
)[:5]
recent_commands = sorted(
command_records,
key=lambda x: x.get("updated_at", ""),
reverse=True,
)[:5]
return {
"service": "edge_dispatch",
"status": "ok",
"started_at": SERVICE_STARTED_AT,
"now": utc_now_iso(),
"config": {
"edge_dispatch_host": settings.edge_dispatch_host,
"edge_dispatch_port": settings.edge_dispatch_port,
"edge_max_dispatch_records": settings.edge_max_dispatch_records,
"oss_enabled": settings.oss_enabled,
},
"devices": {
"connected": connected,
"busy": busy,
"idle": idle,
"items": devices,
},
"dispatches": {
"total": len(dispatch_records),
"by_status": _count_by_status(dispatch_records),
"recent": recent_dispatches,
},
"commands": {
"total": len(command_records),
"by_status": _count_by_status(command_records),
"recent": recent_commands,
},
}
@app.get("/devices") @app.get("/devices")
async def list_devices() -> dict[str, Any]: async def list_devices() -> dict[str, Any]:
return {"devices": await manager.list_devices()} return {"devices": await manager.list_devices()}
@@ -349,6 +427,55 @@ async def get_command(dispatch_id: str) -> dict[str, Any]:
return record return record
@app.post("/frontend/commands")
async def frontend_command(body: FrontendCommandRequest) -> dict[str, Any]:
"""
Frontend-friendly unified command endpoint.
- action=generate -> dispatches generation task to edge
- action=update_code/restart_service/ping -> sends device command via WS
"""
conn = await manager.select_device(body.device_id)
if body.action == "generate":
if body.request is None:
raise HTTPException(status_code=400, detail="request is required when action=generate")
record = await manager.create_dispatch(conn, body.request)
return {
"type": "dispatch",
"dispatch_id": record["dispatch_id"],
"device_id": record["device_id"],
"status": record["status"],
"created_at": record["created_at"],
}
cmd_req = DeviceCommandRequest(
command=body.action,
branch=body.branch,
shell_command=body.shell_command,
payload=body.payload,
)
record = await manager.create_command(conn, cmd_req)
return {
"type": "command",
"dispatch_id": record["dispatch_id"],
"device_id": record["device_id"],
"command": record["command"],
"status": record["status"],
"created_at": record["created_at"],
}
@app.get("/frontend/records/{dispatch_id}")
async def frontend_record(dispatch_id: str) -> dict[str, Any]:
record = manager.dispatches.get(dispatch_id)
if record is not None:
return {"type": "dispatch", **record}
record = manager.commands.get(dispatch_id)
if record is not None:
return {"type": "command", **record}
raise HTTPException(status_code=404, detail=f"record not found: {dispatch_id}")
@app.post("/dispatch/{dispatch_id}/artifacts") @app.post("/dispatch/{dispatch_id}/artifacts")
async def upload_artifacts( async def upload_artifacts(
dispatch_id: str, dispatch_id: str,
@@ -410,3 +537,25 @@ async def edge_socket(websocket: WebSocket, device_id: str) -> None:
except WebSocketDisconnect: except WebSocketDisconnect:
await manager.mark_disconnect_failed(device_id) await manager.mark_disconnect_failed(device_id)
await manager.unregister(device_id) await manager.unregister(device_id)
@app.websocket("/ws/test")
async def ws_test(websocket: WebSocket) -> None:
await websocket.accept()
await websocket.send_json({"event": "connected", "service": "edge_dispatch", "ts": utc_now_iso()})
try:
while True:
msg = await websocket.receive_json()
action = str(msg.get("action", "")).lower()
if action == "ping":
await websocket.send_json({"event": "pong", "ts": utc_now_iso()})
else:
await websocket.send_json(
{
"event": "echo",
"received": msg,
"ts": utc_now_iso(),
}
)
except WebSocketDisconnect:
return

View File

@@ -5,4 +5,5 @@ uvicorn[standard]==0.35.0
pydantic==2.11.7 pydantic==2.11.7
pydantic-settings==2.10.1 pydantic-settings==2.10.1
python-dotenv==1.1.1 python-dotenv==1.1.1
python-multipart==0.0.20
oss2==2.18.5 oss2==2.18.5

View File

@@ -0,0 +1,9 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "$ROOT_DIR"
bash center_dispatch/scripts/stop.sh || true
sleep 1
exec bash center_dispatch/scripts/start.sh

View File

@@ -0,0 +1,7 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "$ROOT_DIR"
exec bash edge_node/scripts/restart.sh

View File

@@ -0,0 +1,7 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "$ROOT_DIR"
exec bash center_dispatch/scripts/start.sh

View File

@@ -0,0 +1,7 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "$ROOT_DIR"
exec bash edge_node/scripts/start.sh

View File

@@ -0,0 +1,7 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "$ROOT_DIR"
exec bash center_dispatch/scripts/stop.sh

View File

@@ -0,0 +1,7 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
cd "$ROOT_DIR"
exec bash edge_node/scripts/stop.sh