from __future__ import annotations from dataclasses import dataclass from queue import Empty, Full, Queue from threading import Event, Lock, Thread from typing import Any from app.core.config import Settings from app.core.logging import logger from app.db.session import SessionLocal from app.domain.schemas import MatchResult from app.repositories.job_repository import JobRepository from app.repositories.worker_repository import WorkerRepository from app.services.card_mapper import job_to_card, worker_to_card from app.services.matching_service import MatchingService from app.utils.ids import generate_id @dataclass class MatchTask: task_id: str kind: str source_id: str top_n: int class MatchQueue: def __init__(self, settings: Settings): self.settings = settings self.queue: Queue[MatchTask] = Queue(maxsize=settings.match_queue_max_size) self._stop_event = Event() self._thread: Thread | None = None self._lock = Lock() self._status: dict[str, str] = {} self._results: dict[str, list[dict[str, Any]]] = {} self._processed = 0 self._failed = 0 def start(self) -> None: if not self.settings.match_async_enabled: return if self._thread and self._thread.is_alive(): return self._thread = Thread(target=self._run, daemon=True, name="match-queue-worker") self._thread.start() logger.info("match queue worker started") def stop(self) -> None: self._stop_event.set() if self._thread and self._thread.is_alive(): self._thread.join(timeout=3) def enqueue_workers(self, job_id: str, top_n: int) -> str: return self._enqueue("workers", job_id, top_n) def enqueue_jobs(self, worker_id: str, top_n: int) -> str: return self._enqueue("jobs", worker_id, top_n) def task_status(self, task_id: str) -> str: with self._lock: return self._status.get(task_id, "not_found") def task_result(self, task_id: str) -> list[dict[str, Any]] | None: with self._lock: return self._results.get(task_id) def stats(self) -> dict[str, int]: with self._lock: return { "queued": self.queue.qsize(), "processed": self._processed, "failed": self._failed, } def _enqueue(self, kind: str, source_id: str, top_n: int) -> str: task_id = generate_id("mq") task = MatchTask(task_id=task_id, kind=kind, source_id=source_id, top_n=top_n) with self._lock: self._status[task_id] = "queued" try: self.queue.put_nowait(task) except Full as exc: with self._lock: self._status[task_id] = "rejected" raise RuntimeError("match queue is full") from exc return task_id def _run(self) -> None: while not self._stop_event.is_set(): try: task = self.queue.get(timeout=0.5) except Empty: continue try: with self._lock: self._status[task.task_id] = "processing" with SessionLocal() as db: service = MatchingService(db) if task.kind == "workers": job = JobRepository(db).get(task.source_id) if job is None: raise ValueError("job not found") items = service.match_workers(job_to_card(job), task.top_n) elif task.kind == "jobs": worker = WorkerRepository(db).get(task.source_id) if worker is None: raise ValueError("worker not found") items = service.match_jobs(worker_to_card(worker), task.top_n) else: raise ValueError(f"unknown task kind {task.kind}") with self._lock: self._status[task.task_id] = "done" self._results[task.task_id] = [item.model_dump(mode="json") for item in items] self._processed += 1 except Exception: logger.exception("match queue task failed task_id=%s kind=%s", task.task_id, task.kind) with self._lock: self._status[task.task_id] = "failed" self._failed += 1 finally: self.queue.task_done()