feat: 初始化零工后端代码
This commit is contained in:
105
gig-poc/apps/api/app/services/ingest_queue.py
Normal file
105
gig-poc/apps/api/app/services/ingest_queue.py
Normal file
@@ -0,0 +1,105 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass
|
||||
from queue import Empty, Full, Queue
|
||||
from threading import Event, Lock, Thread
|
||||
from typing import Any
|
||||
|
||||
from app.core.config import Settings
|
||||
from app.core.logging import logger
|
||||
from app.db.session import SessionLocal
|
||||
from app.domain.schemas import JobCard, WorkerCard
|
||||
from app.services.ingest_service import IngestService
|
||||
from app.utils.ids import generate_id
|
||||
|
||||
|
||||
@dataclass
|
||||
class QueueTask:
|
||||
task_id: str
|
||||
kind: str
|
||||
payload: dict[str, Any]
|
||||
|
||||
|
||||
class IngestQueue:
|
||||
def __init__(self, settings: Settings):
|
||||
self.settings = settings
|
||||
self.queue: Queue[QueueTask] = Queue(maxsize=settings.ingest_queue_max_size)
|
||||
self._stop_event = Event()
|
||||
self._thread: Thread | None = None
|
||||
self._lock = Lock()
|
||||
self._status: dict[str, str] = {}
|
||||
self._processed = 0
|
||||
self._failed = 0
|
||||
|
||||
def start(self) -> None:
|
||||
if not self.settings.ingest_async_enabled:
|
||||
return
|
||||
if self._thread and self._thread.is_alive():
|
||||
return
|
||||
self._thread = Thread(target=self._run, daemon=True, name="ingest-queue-worker")
|
||||
self._thread.start()
|
||||
logger.info("ingest queue worker started")
|
||||
|
||||
def stop(self) -> None:
|
||||
self._stop_event.set()
|
||||
if self._thread and self._thread.is_alive():
|
||||
self._thread.join(timeout=3)
|
||||
|
||||
def enqueue_job(self, card: JobCard) -> str:
|
||||
return self._enqueue("job", card.model_dump(mode="json"))
|
||||
|
||||
def enqueue_worker(self, card: WorkerCard) -> str:
|
||||
return self._enqueue("worker", card.model_dump(mode="json"))
|
||||
|
||||
def task_status(self, task_id: str) -> str:
|
||||
with self._lock:
|
||||
return self._status.get(task_id, "not_found")
|
||||
|
||||
def stats(self) -> dict[str, int]:
|
||||
with self._lock:
|
||||
return {
|
||||
"queued": self.queue.qsize(),
|
||||
"processed": self._processed,
|
||||
"failed": self._failed,
|
||||
}
|
||||
|
||||
def _enqueue(self, kind: str, payload: dict[str, Any]) -> str:
|
||||
task_id = generate_id("queue")
|
||||
task = QueueTask(task_id=task_id, kind=kind, payload=payload)
|
||||
with self._lock:
|
||||
self._status[task_id] = "queued"
|
||||
try:
|
||||
self.queue.put_nowait(task)
|
||||
except Full as exc:
|
||||
with self._lock:
|
||||
self._status[task_id] = "rejected"
|
||||
raise RuntimeError("ingest queue is full") from exc
|
||||
return task_id
|
||||
|
||||
def _run(self) -> None:
|
||||
while not self._stop_event.is_set():
|
||||
try:
|
||||
task = self.queue.get(timeout=0.5)
|
||||
except Empty:
|
||||
continue
|
||||
try:
|
||||
with self._lock:
|
||||
self._status[task.task_id] = "processing"
|
||||
with SessionLocal() as db:
|
||||
service = IngestService(db)
|
||||
if task.kind == "job":
|
||||
service.ingest_job(JobCard(**task.payload))
|
||||
elif task.kind == "worker":
|
||||
service.ingest_worker(WorkerCard(**task.payload))
|
||||
else:
|
||||
raise ValueError(f"unknown task kind {task.kind}")
|
||||
with self._lock:
|
||||
self._status[task.task_id] = "done"
|
||||
self._processed += 1
|
||||
except Exception:
|
||||
logger.exception("ingest queue task failed task_id=%s kind=%s", task.task_id, task.kind)
|
||||
with self._lock:
|
||||
self._status[task.task_id] = "failed"
|
||||
self._failed += 1
|
||||
finally:
|
||||
self.queue.task_done()
|
||||
Reference in New Issue
Block a user