Files
Airtep/gig-poc/apps/api/app/services/match_queue.py
2026-04-01 14:19:25 +08:00

122 lines
4.5 KiB
Python

from __future__ import annotations
from dataclasses import dataclass
from queue import Empty, Full, Queue
from threading import Event, Lock, Thread
from typing import Any
from app.core.config import Settings
from app.core.logging import logger
from app.db.session import SessionLocal
from app.domain.schemas import MatchResult
from app.repositories.job_repository import JobRepository
from app.repositories.worker_repository import WorkerRepository
from app.services.card_mapper import job_to_card, worker_to_card
from app.services.matching_service import MatchingService
from app.utils.ids import generate_id
@dataclass
class MatchTask:
task_id: str
kind: str
source_id: str
top_n: int
class MatchQueue:
def __init__(self, settings: Settings):
self.settings = settings
self.queue: Queue[MatchTask] = Queue(maxsize=settings.match_queue_max_size)
self._stop_event = Event()
self._thread: Thread | None = None
self._lock = Lock()
self._status: dict[str, str] = {}
self._results: dict[str, list[dict[str, Any]]] = {}
self._processed = 0
self._failed = 0
def start(self) -> None:
if not self.settings.match_async_enabled:
return
if self._thread and self._thread.is_alive():
return
self._thread = Thread(target=self._run, daemon=True, name="match-queue-worker")
self._thread.start()
logger.info("match queue worker started")
def stop(self) -> None:
self._stop_event.set()
if self._thread and self._thread.is_alive():
self._thread.join(timeout=3)
def enqueue_workers(self, job_id: str, top_n: int) -> str:
return self._enqueue("workers", job_id, top_n)
def enqueue_jobs(self, worker_id: str, top_n: int) -> str:
return self._enqueue("jobs", worker_id, top_n)
def task_status(self, task_id: str) -> str:
with self._lock:
return self._status.get(task_id, "not_found")
def task_result(self, task_id: str) -> list[dict[str, Any]] | None:
with self._lock:
return self._results.get(task_id)
def stats(self) -> dict[str, int]:
with self._lock:
return {
"queued": self.queue.qsize(),
"processed": self._processed,
"failed": self._failed,
}
def _enqueue(self, kind: str, source_id: str, top_n: int) -> str:
task_id = generate_id("mq")
task = MatchTask(task_id=task_id, kind=kind, source_id=source_id, top_n=top_n)
with self._lock:
self._status[task_id] = "queued"
try:
self.queue.put_nowait(task)
except Full as exc:
with self._lock:
self._status[task_id] = "rejected"
raise RuntimeError("match queue is full") from exc
return task_id
def _run(self) -> None:
while not self._stop_event.is_set():
try:
task = self.queue.get(timeout=0.5)
except Empty:
continue
try:
with self._lock:
self._status[task.task_id] = "processing"
with SessionLocal() as db:
service = MatchingService(db)
if task.kind == "workers":
job = JobRepository(db).get(task.source_id)
if job is None:
raise ValueError("job not found")
items = service.match_workers(job_to_card(job), task.top_n)
elif task.kind == "jobs":
worker = WorkerRepository(db).get(task.source_id)
if worker is None:
raise ValueError("worker not found")
items = service.match_jobs(worker_to_card(worker), task.top_n)
else:
raise ValueError(f"unknown task kind {task.kind}")
with self._lock:
self._status[task.task_id] = "done"
self._results[task.task_id] = [item.model_dump(mode="json") for item in items]
self._processed += 1
except Exception:
logger.exception("match queue task failed task_id=%s kind=%s", task.task_id, task.kind)
with self._lock:
self._status[task.task_id] = "failed"
self._failed += 1
finally:
self.queue.task_done()