fix: 新增数据模块

This commit is contained in:
Daniel
2026-04-07 01:34:49 +08:00
parent e606b3dcd6
commit f529aa3279
11 changed files with 319 additions and 10 deletions

View File

@@ -0,0 +1,15 @@
# Center dispatch service only
EDGE_DISPATCH_HOST=0.0.0.0
EDGE_DISPATCH_PORT=8060
EDGE_MAX_DISPATCH_RECORDS=2000
# OSS (required in production)
OSS_ENABLED=true
OSS_ENDPOINT=https://oss-cn-shanghai.aliyuncs.com
OSS_BUCKET=aiclw
OSS_ACCESS_KEY_ID=LTAI5tPB3Mg5A3p2imzrFjBp
OSS_ACCESS_KEY_SECRET=vg917zL9EWSXvosVeiEcE6w4QT25bV
OSS_PUBLIC_BASE_URL=https://aicdn-video-worker.oss-cn-shanghai.aliyuncs.com
OSS_PREFIX=video-worker
LOG_LEVEL=INFO

View File

@@ -6,6 +6,14 @@ WORKER_BASE_URL=http://127.0.0.1:8000
WS_POLL_INTERVAL_SEC=1.0
EDGE_DISPATCH_HOST=0.0.0.0
EDGE_DISPATCH_PORT=8020
EDGE_MAX_DISPATCH_RECORDS=2000
OSS_ENABLED=false
OSS_ENDPOINT=https://oss-cn-hangzhou.aliyuncs.com
OSS_BUCKET=your-bucket
OSS_ACCESS_KEY_ID=your-ak
OSS_ACCESS_KEY_SECRET=your-sk
OSS_PUBLIC_BASE_URL=https://your-bucket.oss-cn-hangzhou.aliyuncs.com
OSS_PREFIX=video-worker
OUTPUT_DIR=./outputs
RUNTIME_DIR=./runtime
SQLITE_PATH=./runtime/tasks.db

View File

@@ -210,6 +210,8 @@ video_worker/
- 对外 HTTP 入口,触发 WS 下发给边缘设备
- `GET /dispatch/{dispatch_id}`edge_dispatch_service
- 查询调度任务状态和结果
- `POST /dispatch/{dispatch_id}/artifacts`edge_dispatch_service
- 边缘上传产物到中心,由中心服务直传 OSS返回 OSS URL
- `GET /devices`edge_dispatch_service
- 查看在线边缘设备
- `WS /ws/edge/{device_id}`edge_dispatch_service
@@ -339,6 +341,25 @@ curl -X POST http://<dispatch-host>:8020/dispatch/generate \
curl http://<dispatch-host>:8020/dispatch/<dispatch_id>
```
### OSS 直传链路(防止中心堆积)
1. 边缘执行完成后将 `video.mp4/first_frame.jpg/metadata.json/run.log` 提交到:
- `POST /dispatch/{dispatch_id}/artifacts`
2. 中心服务不落地文件,直接流式上传到 OSS。
3. 中心仅保存 `artifact_urls`OSS URL外部系统通过 `GET /dispatch/{dispatch_id}` 获取结果。
需要在 `.env` 配置 OSS
```env
OSS_ENABLED=true
OSS_ENDPOINT=https://oss-cn-hangzhou.aliyuncs.com
OSS_BUCKET=your-bucket
OSS_ACCESS_KEY_ID=your-ak
OSS_ACCESS_KEY_SECRET=your-sk
OSS_PUBLIC_BASE_URL=https://your-bucket.oss-cn-hangzhou.aliyuncs.com
OSS_PREFIX=video-worker
```
## 9. 常见问题
- `ffmpeg not found`

View File

@@ -1,13 +1,16 @@
import asyncio
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Optional
from uuid import uuid4
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from fastapi import FastAPI, File, Form, HTTPException, UploadFile, WebSocket, WebSocketDisconnect
from pydantic import BaseModel
from app.oss_client import oss_uploader
from app.schemas import GenerateRequest
from app.settings import settings
def utc_now_iso() -> str:
@@ -86,6 +89,7 @@ class EdgeDispatchManager:
raise HTTPException(status_code=409, detail="no idle edge device available")
async def create_dispatch(self, conn: EdgeConnection, req: GenerateRequest) -> dict[str, Any]:
await self._prune_if_needed()
dispatch_id = uuid4().hex
now = utc_now_iso()
record = {
@@ -97,6 +101,7 @@ class EdgeDispatchManager:
"updated_at": now,
"result": None,
"error": None,
"artifact_urls": {},
}
async with self.lock:
@@ -125,6 +130,18 @@ class EdgeDispatchManager:
raise
return record
async def _prune_if_needed(self) -> None:
max_records = max(100, int(settings.edge_max_dispatch_records))
async with self.lock:
total = len(self.dispatches)
if total < max_records:
return
over = total - max_records + 1
done = [v for v in self.dispatches.values() if v.get("status") in {"SUCCEEDED", "FAILED"}]
done.sort(key=lambda x: x.get("updated_at", ""))
for rec in done[:over]:
self.dispatches.pop(rec["dispatch_id"], None)
async def mark_event(self, device_id: str, event: dict[str, Any]) -> None:
now = utc_now_iso()
async with self.lock:
@@ -217,6 +234,54 @@ async def get_dispatch(dispatch_id: str) -> dict[str, Any]:
return record
@app.post("/dispatch/{dispatch_id}/artifacts")
async def upload_artifacts(
dispatch_id: str,
task_id: str = Form(default=""),
status: str = Form(default="SUCCEEDED"),
files: list[UploadFile] = File(default_factory=list),
) -> dict[str, Any]:
record = manager.dispatches.get(dispatch_id)
if record is None:
raise HTTPException(status_code=404, detail=f"dispatch not found: {dispatch_id}")
if not oss_uploader.enabled:
raise HTTPException(status_code=400, detail="OSS upload is disabled, set OSS_ENABLED=true")
if not files:
raise HTTPException(status_code=400, detail="no files uploaded")
uploaded: dict[str, dict[str, str]] = {}
for file in files:
name = file.filename or "artifact.bin"
try:
result = await asyncio.to_thread(oss_uploader.upload_fileobj, dispatch_id, name, file.file)
uploaded[Path(name).name] = result
finally:
await file.close()
now = utc_now_iso()
async with manager.lock:
record["artifact_urls"] = uploaded
record["result"] = {
"event": "result",
"dispatch_id": dispatch_id,
"task_id": task_id or None,
"status": status,
"artifact_urls": uploaded,
}
record["status"] = status
record["updated_at"] = now
conn = manager.connections.get(record["device_id"])
if conn:
conn.busy = False
return {
"dispatch_id": dispatch_id,
"status": status,
"artifact_urls": uploaded,
"updated_at": now,
}
@app.websocket("/ws/edge/{device_id}")
async def edge_socket(websocket: WebSocket, device_id: str) -> None:
await websocket.accept()

View File

@@ -0,0 +1,61 @@
from __future__ import annotations
from datetime import datetime, timezone
from pathlib import Path
import oss2
from app.settings import settings
class OSSUploader:
def __init__(self) -> None:
self.enabled = bool(settings.oss_enabled)
if not self.enabled:
self.bucket = None
return
if not all([
settings.oss_endpoint,
settings.oss_bucket,
settings.oss_access_key_id,
settings.oss_access_key_secret,
]):
raise RuntimeError("OSS is enabled but endpoint/bucket/ak/sk is not fully configured")
auth = oss2.Auth(settings.oss_access_key_id, settings.oss_access_key_secret)
self.bucket = oss2.Bucket(auth, settings.oss_endpoint, settings.oss_bucket)
@staticmethod
def _safe_name(name: str) -> str:
return name.replace("\\", "_").replace("/", "_")
def _key(self, dispatch_id: str, filename: str) -> str:
date_part = datetime.now(timezone.utc).strftime("%Y%m%d")
safe_file = self._safe_name(Path(filename).name)
return f"{settings.oss_prefix.strip('/')}/{date_part}/{dispatch_id}/{safe_file}"
def _public_url(self, key: str) -> str:
if settings.oss_public_base_url:
return f"{settings.oss_public_base_url.rstrip('/')}/{key}"
endpoint = settings.oss_endpoint.rstrip("/")
if endpoint.startswith("http://") or endpoint.startswith("https://"):
return f"{endpoint}/{key}"
return f"https://{endpoint}/{key}"
def upload_fileobj(self, dispatch_id: str, filename: str, fileobj) -> dict[str, str]:
if not self.enabled or self.bucket is None:
raise RuntimeError("OSS uploader is not enabled")
key = self._key(dispatch_id, filename)
fileobj.seek(0)
self.bucket.put_object(key, fileobj)
return {
"filename": Path(filename).name,
"object_key": key,
"url": self._public_url(key),
}
oss_uploader = OSSUploader()

View File

@@ -13,6 +13,15 @@ class Settings(BaseSettings):
ws_poll_interval_sec: float = Field(default=1.0, alias="WS_POLL_INTERVAL_SEC")
edge_dispatch_host: str = Field(default="0.0.0.0", alias="EDGE_DISPATCH_HOST")
edge_dispatch_port: int = Field(default=8020, alias="EDGE_DISPATCH_PORT")
edge_max_dispatch_records: int = Field(default=2000, alias="EDGE_MAX_DISPATCH_RECORDS")
oss_enabled: bool = Field(default=False, alias="OSS_ENABLED")
oss_endpoint: str = Field(default="", alias="OSS_ENDPOINT")
oss_bucket: str = Field(default="", alias="OSS_BUCKET")
oss_access_key_id: str = Field(default="", alias="OSS_ACCESS_KEY_ID")
oss_access_key_secret: str = Field(default="", alias="OSS_ACCESS_KEY_SECRET")
oss_public_base_url: str = Field(default="", alias="OSS_PUBLIC_BASE_URL")
oss_prefix: str = Field(default="video-worker", alias="OSS_PREFIX")
output_dir: Path = Field(default=Path("./outputs"), alias="OUTPUT_DIR")
runtime_dir: Path = Field(default=Path("./runtime"), alias="RUNTIME_DIR")

View File

@@ -0,0 +1,22 @@
services:
edge-dispatch:
build:
context: .
dockerfile: docker/edge-dispatch/Dockerfile
container_name: video-worker-center-dispatch
env_file:
- .env.center
command:
[
"python",
"-m",
"uvicorn",
"app.edge_dispatch_service:app",
"--host",
"0.0.0.0",
"--port",
"${EDGE_DISPATCH_PORT:-8020}"
]
ports:
- "${EDGE_DISPATCH_PORT:-8020}:${EDGE_DISPATCH_PORT:-8020}"
restart: unless-stopped

View File

@@ -11,3 +11,4 @@ safetensors==0.6.2
Pillow==11.3.0
requests==2.32.4
websockets==15.0.1
oss2==2.18.5

View File

@@ -1,8 +1,10 @@
import asyncio
import json
import os
from pathlib import Path
import sys
import time
from urllib.parse import urlparse
import requests
import websockets
@@ -10,11 +12,23 @@ import websockets
DISPATCH_WS_URL = os.getenv("DISPATCH_WS_URL", "ws://127.0.0.1:8020/ws/edge/edge-a4000-01")
WORKER_BASE_URL = os.getenv("WORKER_BASE_URL", "http://127.0.0.1:8000")
POLL_INTERVAL = float(os.getenv("EDGE_POLL_INTERVAL_SEC", "1.0"))
DISPATCH_HTTP_BASE = os.getenv("DISPATCH_HTTP_BASE", "")
if len(sys.argv) > 1:
DISPATCH_WS_URL = sys.argv[1]
def infer_http_base(ws_url: str) -> str:
parsed = urlparse(ws_url)
scheme = "https" if parsed.scheme == "wss" else "http"
host = parsed.netloc
return f"{scheme}://{host}"
if not DISPATCH_HTTP_BASE:
DISPATCH_HTTP_BASE = infer_http_base(DISPATCH_WS_URL)
def worker_post(path: str, payload: dict):
r = requests.post(f"{WORKER_BASE_URL}{path}", json=payload, timeout=30)
r.raise_for_status()
@@ -27,6 +41,43 @@ def worker_get(path: str):
return r.json()
def upload_artifacts(dispatch_id: str, task_id: str, result: dict) -> dict:
candidate_fields = ["video_path", "first_frame_path", "metadata_path", "log_path"]
existing_paths = []
for field in candidate_fields:
p = result.get(field)
if p and Path(p).exists():
existing_paths.append(Path(p))
if not existing_paths:
return {}
opened = []
files = []
try:
for path in existing_paths:
fh = path.open("rb")
opened.append(fh)
files.append(("files", (path.name, fh, "application/octet-stream")))
data = {
"task_id": task_id,
"status": result.get("status", "SUCCEEDED"),
}
resp = requests.post(
f"{DISPATCH_HTTP_BASE}/dispatch/{dispatch_id}/artifacts",
data=data,
files=files,
timeout=600,
)
resp.raise_for_status()
payload = resp.json()
return payload.get("artifact_urls", {})
finally:
for fh in opened:
fh.close()
async def handle_generate(ws, data: dict):
dispatch_id = data["dispatch_id"]
req = data["request"]
@@ -51,16 +102,20 @@ async def handle_generate(ws, data: dict):
)
if status["status"] in {"SUCCEEDED", "FAILED"}:
result = await asyncio.to_thread(worker_get, f"/tasks/{task_id}/result")
artifact_urls = {}
result_payload = {
"event": "result",
"dispatch_id": dispatch_id,
"task_id": task_id,
"status": result.get("status", status["status"]),
}
if status["status"] == "SUCCEEDED":
artifact_urls = await asyncio.to_thread(upload_artifacts, dispatch_id, task_id, result)
result_payload["artifact_urls"] = artifact_urls
else:
result_payload["error"] = result.get("error")
await ws.send(
json.dumps(
{
"event": "result",
"dispatch_id": dispatch_id,
"task_id": task_id,
**result,
},
ensure_ascii=False,
)
json.dumps(result_payload, ensure_ascii=False)
)
return
await asyncio.sleep(POLL_INTERVAL)

View File

@@ -0,0 +1,39 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
cd "$ROOT_DIR"
if ! command -v docker >/dev/null 2>&1; then
echo "[ERROR] docker not found"
exit 1
fi
if ! docker compose version >/dev/null 2>&1; then
echo "[ERROR] docker compose not available"
exit 1
fi
if [ ! -f .env.center ]; then
cp .env.center.example .env.center
echo "[ERROR] .env.center was missing; template created at .env.center"
echo "Please edit OSS and port settings, then rerun this script."
exit 1
fi
# hard split: center project name + dedicated compose/env file
PROJECT_NAME="video-worker-center"
COMPOSE_FILE="docker-compose.center-dispatch.yml"
EDGE_PORT=$(grep '^EDGE_DISPATCH_PORT=' .env.center | tail -n1 | cut -d'=' -f2- || true)
EDGE_PORT="${EDGE_PORT:-8020}"
docker compose \
--project-name "$PROJECT_NAME" \
--env-file .env.center \
-f "$COMPOSE_FILE" \
up -d --build
echo "[OK] center dispatch service started"
echo "[INFO] health: curl http://127.0.0.1:${EDGE_PORT}/health"
echo "[INFO] devices: curl http://127.0.0.1:${EDGE_PORT}/devices"

View File

@@ -0,0 +1,13 @@
#!/usr/bin/env bash
set -euo pipefail
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")/.." && pwd)"
cd "$ROOT_DIR"
docker compose \
--project-name video-worker-center \
--env-file .env.center \
-f docker-compose.center-dispatch.yml \
down
echo "[OK] center dispatch service stopped"