122 lines
4.5 KiB
Python
122 lines
4.5 KiB
Python
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from queue import Empty, Full, Queue
|
|
from threading import Event, Lock, Thread
|
|
from typing import Any
|
|
|
|
from app.core.config import Settings
|
|
from app.core.logging import logger
|
|
from app.db.session import SessionLocal
|
|
from app.domain.schemas import MatchResult
|
|
from app.repositories.job_repository import JobRepository
|
|
from app.repositories.worker_repository import WorkerRepository
|
|
from app.services.card_mapper import job_to_card, worker_to_card
|
|
from app.services.matching_service import MatchingService
|
|
from app.utils.ids import generate_id
|
|
|
|
|
|
@dataclass
|
|
class MatchTask:
|
|
task_id: str
|
|
kind: str
|
|
source_id: str
|
|
top_n: int
|
|
|
|
|
|
class MatchQueue:
|
|
def __init__(self, settings: Settings):
|
|
self.settings = settings
|
|
self.queue: Queue[MatchTask] = Queue(maxsize=settings.match_queue_max_size)
|
|
self._stop_event = Event()
|
|
self._thread: Thread | None = None
|
|
self._lock = Lock()
|
|
self._status: dict[str, str] = {}
|
|
self._results: dict[str, list[dict[str, Any]]] = {}
|
|
self._processed = 0
|
|
self._failed = 0
|
|
|
|
def start(self) -> None:
|
|
if not self.settings.match_async_enabled:
|
|
return
|
|
if self._thread and self._thread.is_alive():
|
|
return
|
|
self._thread = Thread(target=self._run, daemon=True, name="match-queue-worker")
|
|
self._thread.start()
|
|
logger.info("match queue worker started")
|
|
|
|
def stop(self) -> None:
|
|
self._stop_event.set()
|
|
if self._thread and self._thread.is_alive():
|
|
self._thread.join(timeout=3)
|
|
|
|
def enqueue_workers(self, job_id: str, top_n: int) -> str:
|
|
return self._enqueue("workers", job_id, top_n)
|
|
|
|
def enqueue_jobs(self, worker_id: str, top_n: int) -> str:
|
|
return self._enqueue("jobs", worker_id, top_n)
|
|
|
|
def task_status(self, task_id: str) -> str:
|
|
with self._lock:
|
|
return self._status.get(task_id, "not_found")
|
|
|
|
def task_result(self, task_id: str) -> list[dict[str, Any]] | None:
|
|
with self._lock:
|
|
return self._results.get(task_id)
|
|
|
|
def stats(self) -> dict[str, int]:
|
|
with self._lock:
|
|
return {
|
|
"queued": self.queue.qsize(),
|
|
"processed": self._processed,
|
|
"failed": self._failed,
|
|
}
|
|
|
|
def _enqueue(self, kind: str, source_id: str, top_n: int) -> str:
|
|
task_id = generate_id("mq")
|
|
task = MatchTask(task_id=task_id, kind=kind, source_id=source_id, top_n=top_n)
|
|
with self._lock:
|
|
self._status[task_id] = "queued"
|
|
try:
|
|
self.queue.put_nowait(task)
|
|
except Full as exc:
|
|
with self._lock:
|
|
self._status[task_id] = "rejected"
|
|
raise RuntimeError("match queue is full") from exc
|
|
return task_id
|
|
|
|
def _run(self) -> None:
|
|
while not self._stop_event.is_set():
|
|
try:
|
|
task = self.queue.get(timeout=0.5)
|
|
except Empty:
|
|
continue
|
|
try:
|
|
with self._lock:
|
|
self._status[task.task_id] = "processing"
|
|
with SessionLocal() as db:
|
|
service = MatchingService(db)
|
|
if task.kind == "workers":
|
|
job = JobRepository(db).get(task.source_id)
|
|
if job is None:
|
|
raise ValueError("job not found")
|
|
items = service.match_workers(job_to_card(job), task.top_n)
|
|
elif task.kind == "jobs":
|
|
worker = WorkerRepository(db).get(task.source_id)
|
|
if worker is None:
|
|
raise ValueError("worker not found")
|
|
items = service.match_jobs(worker_to_card(worker), task.top_n)
|
|
else:
|
|
raise ValueError(f"unknown task kind {task.kind}")
|
|
with self._lock:
|
|
self._status[task.task_id] = "done"
|
|
self._results[task.task_id] = [item.model_dump(mode="json") for item in items]
|
|
self._processed += 1
|
|
except Exception:
|
|
logger.exception("match queue task failed task_id=%s kind=%s", task.task_id, task.kind)
|
|
with self._lock:
|
|
self._status[task.task_id] = "failed"
|
|
self._failed += 1
|
|
finally:
|
|
self.queue.task_done()
|