From f529aa327917a6ee28da9a45397e4ec909fb99a4 Mon Sep 17 00:00:00 2001 From: Daniel Date: Tue, 7 Apr 2026 01:34:49 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E6=96=B0=E5=A2=9E=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- video_worker/.env.center.example | 15 ++++ video_worker/.env.example | 8 ++ video_worker/README.md | 21 ++++++ video_worker/app/edge_dispatch_service.py | 67 ++++++++++++++++- video_worker/app/oss_client.py | 61 ++++++++++++++++ video_worker/app/settings.py | 9 +++ .../docker-compose.center-dispatch.yml | 22 ++++++ video_worker/requirements.txt | 1 + video_worker/scripts/edge_device_client.py | 73 ++++++++++++++++--- video_worker/scripts/start_center_dispatch.sh | 39 ++++++++++ video_worker/scripts/stop_center_dispatch.sh | 13 ++++ 11 files changed, 319 insertions(+), 10 deletions(-) create mode 100644 video_worker/.env.center.example create mode 100644 video_worker/app/oss_client.py create mode 100644 video_worker/docker-compose.center-dispatch.yml create mode 100755 video_worker/scripts/start_center_dispatch.sh create mode 100755 video_worker/scripts/stop_center_dispatch.sh diff --git a/video_worker/.env.center.example b/video_worker/.env.center.example new file mode 100644 index 0000000..1e429cf --- /dev/null +++ b/video_worker/.env.center.example @@ -0,0 +1,15 @@ +# Center dispatch service only +EDGE_DISPATCH_HOST=0.0.0.0 +EDGE_DISPATCH_PORT=8060 +EDGE_MAX_DISPATCH_RECORDS=2000 + +# OSS (required in production) +OSS_ENABLED=true +OSS_ENDPOINT=https://oss-cn-shanghai.aliyuncs.com +OSS_BUCKET=aiclw +OSS_ACCESS_KEY_ID=LTAI5tPB3Mg5A3p2imzrFjBp +OSS_ACCESS_KEY_SECRET=vg917zL9EWSXvosVeiEcE6w4QT25bV +OSS_PUBLIC_BASE_URL=https://aicdn-video-worker.oss-cn-shanghai.aliyuncs.com +OSS_PREFIX=video-worker + +LOG_LEVEL=INFO diff --git a/video_worker/.env.example b/video_worker/.env.example index e447043..acf679b 100644 --- a/video_worker/.env.example +++ b/video_worker/.env.example @@ -6,6 +6,14 @@ WORKER_BASE_URL=http://127.0.0.1:8000 WS_POLL_INTERVAL_SEC=1.0 EDGE_DISPATCH_HOST=0.0.0.0 EDGE_DISPATCH_PORT=8020 +EDGE_MAX_DISPATCH_RECORDS=2000 +OSS_ENABLED=false +OSS_ENDPOINT=https://oss-cn-hangzhou.aliyuncs.com +OSS_BUCKET=your-bucket +OSS_ACCESS_KEY_ID=your-ak +OSS_ACCESS_KEY_SECRET=your-sk +OSS_PUBLIC_BASE_URL=https://your-bucket.oss-cn-hangzhou.aliyuncs.com +OSS_PREFIX=video-worker OUTPUT_DIR=./outputs RUNTIME_DIR=./runtime SQLITE_PATH=./runtime/tasks.db diff --git a/video_worker/README.md b/video_worker/README.md index 4a056de..7fbf237 100644 --- a/video_worker/README.md +++ b/video_worker/README.md @@ -210,6 +210,8 @@ video_worker/ - 对外 HTTP 入口,触发 WS 下发给边缘设备 - `GET /dispatch/{dispatch_id}`(edge_dispatch_service) - 查询调度任务状态和结果 +- `POST /dispatch/{dispatch_id}/artifacts`(edge_dispatch_service) + - 边缘上传产物到中心,由中心服务直传 OSS,返回 OSS URL - `GET /devices`(edge_dispatch_service) - 查看在线边缘设备 - `WS /ws/edge/{device_id}`(edge_dispatch_service) @@ -339,6 +341,25 @@ curl -X POST http://:8020/dispatch/generate \ curl http://:8020/dispatch/ ``` +### OSS 直传链路(防止中心堆积) + +1. 边缘执行完成后将 `video.mp4/first_frame.jpg/metadata.json/run.log` 提交到: + - `POST /dispatch/{dispatch_id}/artifacts` +2. 中心服务不落地文件,直接流式上传到 OSS。 +3. 中心仅保存 `artifact_urls`(OSS URL),外部系统通过 `GET /dispatch/{dispatch_id}` 获取结果。 + +需要在 `.env` 配置 OSS: + +```env +OSS_ENABLED=true +OSS_ENDPOINT=https://oss-cn-hangzhou.aliyuncs.com +OSS_BUCKET=your-bucket +OSS_ACCESS_KEY_ID=your-ak +OSS_ACCESS_KEY_SECRET=your-sk +OSS_PUBLIC_BASE_URL=https://your-bucket.oss-cn-hangzhou.aliyuncs.com +OSS_PREFIX=video-worker +``` + ## 9. 常见问题 - `ffmpeg not found` diff --git a/video_worker/app/edge_dispatch_service.py b/video_worker/app/edge_dispatch_service.py index 9a8ac94..c98ff02 100644 --- a/video_worker/app/edge_dispatch_service.py +++ b/video_worker/app/edge_dispatch_service.py @@ -1,13 +1,16 @@ import asyncio from dataclasses import dataclass from datetime import datetime, timezone +from pathlib import Path from typing import Any, Optional from uuid import uuid4 -from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect +from fastapi import FastAPI, File, Form, HTTPException, UploadFile, WebSocket, WebSocketDisconnect from pydantic import BaseModel +from app.oss_client import oss_uploader from app.schemas import GenerateRequest +from app.settings import settings def utc_now_iso() -> str: @@ -86,6 +89,7 @@ class EdgeDispatchManager: raise HTTPException(status_code=409, detail="no idle edge device available") async def create_dispatch(self, conn: EdgeConnection, req: GenerateRequest) -> dict[str, Any]: + await self._prune_if_needed() dispatch_id = uuid4().hex now = utc_now_iso() record = { @@ -97,6 +101,7 @@ class EdgeDispatchManager: "updated_at": now, "result": None, "error": None, + "artifact_urls": {}, } async with self.lock: @@ -125,6 +130,18 @@ class EdgeDispatchManager: raise return record + async def _prune_if_needed(self) -> None: + max_records = max(100, int(settings.edge_max_dispatch_records)) + async with self.lock: + total = len(self.dispatches) + if total < max_records: + return + over = total - max_records + 1 + done = [v for v in self.dispatches.values() if v.get("status") in {"SUCCEEDED", "FAILED"}] + done.sort(key=lambda x: x.get("updated_at", "")) + for rec in done[:over]: + self.dispatches.pop(rec["dispatch_id"], None) + async def mark_event(self, device_id: str, event: dict[str, Any]) -> None: now = utc_now_iso() async with self.lock: @@ -217,6 +234,54 @@ async def get_dispatch(dispatch_id: str) -> dict[str, Any]: return record +@app.post("/dispatch/{dispatch_id}/artifacts") +async def upload_artifacts( + dispatch_id: str, + task_id: str = Form(default=""), + status: str = Form(default="SUCCEEDED"), + files: list[UploadFile] = File(default_factory=list), +) -> dict[str, Any]: + record = manager.dispatches.get(dispatch_id) + if record is None: + raise HTTPException(status_code=404, detail=f"dispatch not found: {dispatch_id}") + if not oss_uploader.enabled: + raise HTTPException(status_code=400, detail="OSS upload is disabled, set OSS_ENABLED=true") + if not files: + raise HTTPException(status_code=400, detail="no files uploaded") + + uploaded: dict[str, dict[str, str]] = {} + for file in files: + name = file.filename or "artifact.bin" + try: + result = await asyncio.to_thread(oss_uploader.upload_fileobj, dispatch_id, name, file.file) + uploaded[Path(name).name] = result + finally: + await file.close() + + now = utc_now_iso() + async with manager.lock: + record["artifact_urls"] = uploaded + record["result"] = { + "event": "result", + "dispatch_id": dispatch_id, + "task_id": task_id or None, + "status": status, + "artifact_urls": uploaded, + } + record["status"] = status + record["updated_at"] = now + conn = manager.connections.get(record["device_id"]) + if conn: + conn.busy = False + + return { + "dispatch_id": dispatch_id, + "status": status, + "artifact_urls": uploaded, + "updated_at": now, + } + + @app.websocket("/ws/edge/{device_id}") async def edge_socket(websocket: WebSocket, device_id: str) -> None: await websocket.accept() diff --git a/video_worker/app/oss_client.py b/video_worker/app/oss_client.py new file mode 100644 index 0000000..d8de513 --- /dev/null +++ b/video_worker/app/oss_client.py @@ -0,0 +1,61 @@ +from __future__ import annotations + +from datetime import datetime, timezone +from pathlib import Path + +import oss2 + +from app.settings import settings + + +class OSSUploader: + def __init__(self) -> None: + self.enabled = bool(settings.oss_enabled) + if not self.enabled: + self.bucket = None + return + + if not all([ + settings.oss_endpoint, + settings.oss_bucket, + settings.oss_access_key_id, + settings.oss_access_key_secret, + ]): + raise RuntimeError("OSS is enabled but endpoint/bucket/ak/sk is not fully configured") + + auth = oss2.Auth(settings.oss_access_key_id, settings.oss_access_key_secret) + self.bucket = oss2.Bucket(auth, settings.oss_endpoint, settings.oss_bucket) + + @staticmethod + def _safe_name(name: str) -> str: + return name.replace("\\", "_").replace("/", "_") + + def _key(self, dispatch_id: str, filename: str) -> str: + date_part = datetime.now(timezone.utc).strftime("%Y%m%d") + safe_file = self._safe_name(Path(filename).name) + return f"{settings.oss_prefix.strip('/')}/{date_part}/{dispatch_id}/{safe_file}" + + def _public_url(self, key: str) -> str: + if settings.oss_public_base_url: + return f"{settings.oss_public_base_url.rstrip('/')}/{key}" + + endpoint = settings.oss_endpoint.rstrip("/") + if endpoint.startswith("http://") or endpoint.startswith("https://"): + return f"{endpoint}/{key}" + return f"https://{endpoint}/{key}" + + def upload_fileobj(self, dispatch_id: str, filename: str, fileobj) -> dict[str, str]: + if not self.enabled or self.bucket is None: + raise RuntimeError("OSS uploader is not enabled") + + key = self._key(dispatch_id, filename) + fileobj.seek(0) + self.bucket.put_object(key, fileobj) + return { + "filename": Path(filename).name, + "object_key": key, + "url": self._public_url(key), + } + + +oss_uploader = OSSUploader() diff --git a/video_worker/app/settings.py b/video_worker/app/settings.py index 34540a2..7a019f5 100644 --- a/video_worker/app/settings.py +++ b/video_worker/app/settings.py @@ -13,6 +13,15 @@ class Settings(BaseSettings): ws_poll_interval_sec: float = Field(default=1.0, alias="WS_POLL_INTERVAL_SEC") edge_dispatch_host: str = Field(default="0.0.0.0", alias="EDGE_DISPATCH_HOST") edge_dispatch_port: int = Field(default=8020, alias="EDGE_DISPATCH_PORT") + edge_max_dispatch_records: int = Field(default=2000, alias="EDGE_MAX_DISPATCH_RECORDS") + + oss_enabled: bool = Field(default=False, alias="OSS_ENABLED") + oss_endpoint: str = Field(default="", alias="OSS_ENDPOINT") + oss_bucket: str = Field(default="", alias="OSS_BUCKET") + oss_access_key_id: str = Field(default="", alias="OSS_ACCESS_KEY_ID") + oss_access_key_secret: str = Field(default="", alias="OSS_ACCESS_KEY_SECRET") + oss_public_base_url: str = Field(default="", alias="OSS_PUBLIC_BASE_URL") + oss_prefix: str = Field(default="video-worker", alias="OSS_PREFIX") output_dir: Path = Field(default=Path("./outputs"), alias="OUTPUT_DIR") runtime_dir: Path = Field(default=Path("./runtime"), alias="RUNTIME_DIR") diff --git a/video_worker/docker-compose.center-dispatch.yml b/video_worker/docker-compose.center-dispatch.yml new file mode 100644 index 0000000..168c922 --- /dev/null +++ b/video_worker/docker-compose.center-dispatch.yml @@ -0,0 +1,22 @@ +services: + edge-dispatch: + build: + context: . + dockerfile: docker/edge-dispatch/Dockerfile + container_name: video-worker-center-dispatch + env_file: + - .env.center + command: + [ + "python", + "-m", + "uvicorn", + "app.edge_dispatch_service:app", + "--host", + "0.0.0.0", + "--port", + "${EDGE_DISPATCH_PORT:-8020}" + ] + ports: + - "${EDGE_DISPATCH_PORT:-8020}:${EDGE_DISPATCH_PORT:-8020}" + restart: unless-stopped diff --git a/video_worker/requirements.txt b/video_worker/requirements.txt index 06d1f01..858cdfe 100644 --- a/video_worker/requirements.txt +++ b/video_worker/requirements.txt @@ -11,3 +11,4 @@ safetensors==0.6.2 Pillow==11.3.0 requests==2.32.4 websockets==15.0.1 +oss2==2.18.5 diff --git a/video_worker/scripts/edge_device_client.py b/video_worker/scripts/edge_device_client.py index f3530f5..999a3d6 100644 --- a/video_worker/scripts/edge_device_client.py +++ b/video_worker/scripts/edge_device_client.py @@ -1,8 +1,10 @@ import asyncio import json import os +from pathlib import Path import sys import time +from urllib.parse import urlparse import requests import websockets @@ -10,11 +12,23 @@ import websockets DISPATCH_WS_URL = os.getenv("DISPATCH_WS_URL", "ws://127.0.0.1:8020/ws/edge/edge-a4000-01") WORKER_BASE_URL = os.getenv("WORKER_BASE_URL", "http://127.0.0.1:8000") POLL_INTERVAL = float(os.getenv("EDGE_POLL_INTERVAL_SEC", "1.0")) +DISPATCH_HTTP_BASE = os.getenv("DISPATCH_HTTP_BASE", "") if len(sys.argv) > 1: DISPATCH_WS_URL = sys.argv[1] +def infer_http_base(ws_url: str) -> str: + parsed = urlparse(ws_url) + scheme = "https" if parsed.scheme == "wss" else "http" + host = parsed.netloc + return f"{scheme}://{host}" + + +if not DISPATCH_HTTP_BASE: + DISPATCH_HTTP_BASE = infer_http_base(DISPATCH_WS_URL) + + def worker_post(path: str, payload: dict): r = requests.post(f"{WORKER_BASE_URL}{path}", json=payload, timeout=30) r.raise_for_status() @@ -27,6 +41,43 @@ def worker_get(path: str): return r.json() +def upload_artifacts(dispatch_id: str, task_id: str, result: dict) -> dict: + candidate_fields = ["video_path", "first_frame_path", "metadata_path", "log_path"] + existing_paths = [] + for field in candidate_fields: + p = result.get(field) + if p and Path(p).exists(): + existing_paths.append(Path(p)) + + if not existing_paths: + return {} + + opened = [] + files = [] + try: + for path in existing_paths: + fh = path.open("rb") + opened.append(fh) + files.append(("files", (path.name, fh, "application/octet-stream"))) + + data = { + "task_id": task_id, + "status": result.get("status", "SUCCEEDED"), + } + resp = requests.post( + f"{DISPATCH_HTTP_BASE}/dispatch/{dispatch_id}/artifacts", + data=data, + files=files, + timeout=600, + ) + resp.raise_for_status() + payload = resp.json() + return payload.get("artifact_urls", {}) + finally: + for fh in opened: + fh.close() + + async def handle_generate(ws, data: dict): dispatch_id = data["dispatch_id"] req = data["request"] @@ -51,16 +102,20 @@ async def handle_generate(ws, data: dict): ) if status["status"] in {"SUCCEEDED", "FAILED"}: result = await asyncio.to_thread(worker_get, f"/tasks/{task_id}/result") + artifact_urls = {} + result_payload = { + "event": "result", + "dispatch_id": dispatch_id, + "task_id": task_id, + "status": result.get("status", status["status"]), + } + if status["status"] == "SUCCEEDED": + artifact_urls = await asyncio.to_thread(upload_artifacts, dispatch_id, task_id, result) + result_payload["artifact_urls"] = artifact_urls + else: + result_payload["error"] = result.get("error") await ws.send( - json.dumps( - { - "event": "result", - "dispatch_id": dispatch_id, - "task_id": task_id, - **result, - }, - ensure_ascii=False, - ) + json.dumps(result_payload, ensure_ascii=False) ) return await asyncio.sleep(POLL_INTERVAL) diff --git a/video_worker/scripts/start_center_dispatch.sh b/video_worker/scripts/start_center_dispatch.sh new file mode 100755 index 0000000..e6e4598 --- /dev/null +++ b/video_worker/scripts/start_center_dispatch.sh @@ -0,0 +1,39 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +cd "$ROOT_DIR" + +if ! command -v docker >/dev/null 2>&1; then + echo "[ERROR] docker not found" + exit 1 +fi + +if ! docker compose version >/dev/null 2>&1; then + echo "[ERROR] docker compose not available" + exit 1 +fi + +if [ ! -f .env.center ]; then + cp .env.center.example .env.center + echo "[ERROR] .env.center was missing; template created at .env.center" + echo "Please edit OSS and port settings, then rerun this script." + exit 1 +fi + +# hard split: center project name + dedicated compose/env file +PROJECT_NAME="video-worker-center" +COMPOSE_FILE="docker-compose.center-dispatch.yml" + +EDGE_PORT=$(grep '^EDGE_DISPATCH_PORT=' .env.center | tail -n1 | cut -d'=' -f2- || true) +EDGE_PORT="${EDGE_PORT:-8020}" + +docker compose \ + --project-name "$PROJECT_NAME" \ + --env-file .env.center \ + -f "$COMPOSE_FILE" \ + up -d --build + +echo "[OK] center dispatch service started" +echo "[INFO] health: curl http://127.0.0.1:${EDGE_PORT}/health" +echo "[INFO] devices: curl http://127.0.0.1:${EDGE_PORT}/devices" diff --git a/video_worker/scripts/stop_center_dispatch.sh b/video_worker/scripts/stop_center_dispatch.sh new file mode 100755 index 0000000..052b022 --- /dev/null +++ b/video_worker/scripts/stop_center_dispatch.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash +set -euo pipefail + +ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)" +cd "$ROOT_DIR" + +docker compose \ + --project-name video-worker-center \ + --env-file .env.center \ + -f docker-compose.center-dispatch.yml \ + down + +echo "[OK] center dispatch service stopped"