from __future__ import annotations from datetime import datetime from sqlalchemy.orm import Session from app.core.config import get_settings from app.core.logging import logger from app.domain.schemas import JobCard, MatchBreakdown, MatchResult, QueryFilters, SourceType, WorkerCard from app.repositories.job_repository import JobRepository from app.repositories.match_repository import MatchRepository from app.repositories.worker_repository import WorkerRepository from app.services.card_mapper import job_to_card, worker_to_card from app.services.rag.lightrag_adapter import LightRAGAdapter from app.utils.ids import generate_id class MatchingService: def __init__(self, db: Session): self.db = db self.settings = get_settings() self.jobs = JobRepository(db) self.workers = WorkerRepository(db) self.matches = MatchRepository(db) self.rag = LightRAGAdapter(self.settings) def match_workers(self, source: JobCard, top_n: int) -> list[MatchResult]: logger.info("match_workers source_id=%s top_n=%s", source.job_id, top_n) query_text = " ".join([source.title, source.category, source.city, source.region, *source.skills, *source.tags]) candidate_ids = self.rag.search( query_text=query_text, filters=QueryFilters(entity_type="worker", city=source.city), limit=max(top_n * 3, self.settings.default_recall_top_k), ) candidates = self.workers.get_many(candidate_ids) or self.workers.list(limit=max(top_n * 3, 50)) results = [self._build_job_to_worker_match(source, worker_to_card(worker)) for worker in candidates] results = sorted(results, key=lambda item: item.match_score, reverse=True)[:top_n] self.matches.bulk_replace(results, SourceType.job_to_worker.value, source.job_id) return results def match_jobs(self, source: WorkerCard, top_n: int) -> list[MatchResult]: logger.info("match_jobs source_id=%s top_n=%s", source.worker_id, top_n) query_text = " ".join([source.name, *source.cities, *source.regions, *[item.name for item in source.skills], *source.experience_tags]) city = source.cities[0] if source.cities else None candidate_ids = self.rag.search( query_text=query_text, filters=QueryFilters(entity_type="job", city=city), limit=max(top_n * 3, self.settings.default_recall_top_k), ) candidates = self.jobs.get_many(candidate_ids) or self.jobs.list(limit=max(top_n * 3, 50)) results = [self._build_worker_to_job_match(source, job_to_card(job)) for job in candidates] results = sorted(results, key=lambda item: item.match_score, reverse=True)[:top_n] self.matches.bulk_replace(results, SourceType.worker_to_job.value, source.worker_id) return results def explain(self, match_id: str) -> MatchResult | None: record = self.matches.get(match_id) if record is None: return None from app.services.card_mapper import match_record_to_schema return match_record_to_schema(record) def _build_job_to_worker_match(self, job: JobCard, worker: WorkerCard) -> MatchResult: job_skills = set(job.skills) expanded_skills = self.rag.expand_skills(job.skills) worker_skills = {item.name: item.score for item in worker.skills} direct_hits = job_skills.intersection(worker_skills.keys()) expanded_hits = expanded_skills.intersection(worker_skills.keys()) base_skill_score = sum(worker_skills[name] for name in expanded_hits) / max(len(job_skills), 1) if not direct_hits: base_skill_score *= 0.4 skill_score = min(base_skill_score, 1.0) region_score = self._region_score(job.city, job.region, worker.cities, worker.regions) time_score = self._time_score(job.start_time, worker.availability) experience_score = self._experience_score([job.category, *job.tags], worker.experience_tags) reliability_score = worker.reliability_score score = self._weighted_score(skill_score, region_score, time_score, experience_score, reliability_score) breakdown = MatchBreakdown( skill_score=round(skill_score, 2), region_score=round(region_score, 2), time_score=round(time_score, 2), experience_score=round(experience_score, 2), reliability_score=round(reliability_score, 2), ) reasons = self._build_reasons( matched_skills=list(expanded_hits)[:3], region_hit=region_score, time_score=time_score, experience_hits=list(set(job.tags).intersection(worker.experience_tags))[:2] or [job.category], reliability_score=reliability_score, target_region=job.region, ) return MatchResult( match_id=generate_id("match"), source_type=SourceType.job_to_worker, source_id=job.job_id, target_id=worker.worker_id, match_score=round(score, 2), breakdown=breakdown, reasons=reasons, ) def _build_worker_to_job_match(self, worker: WorkerCard, job: JobCard) -> MatchResult: reverse = self._build_job_to_worker_match(job, worker) return MatchResult( match_id=generate_id("match"), source_type=SourceType.worker_to_job, source_id=worker.worker_id, target_id=job.job_id, match_score=reverse.match_score, breakdown=reverse.breakdown, reasons=reverse.reasons, ) def _region_score(self, job_city: str, job_region: str, worker_cities: list[str], worker_regions: list[str]) -> float: if job_region in worker_regions: return 1.0 if job_city in worker_cities: return 0.7 return 0.2 def _time_score(self, start_time: datetime, availability: list[str]) -> float: if "anytime" in availability: return 1.0 is_weekend = start_time.weekday() >= 5 desired = "weekend" if is_weekend else ("weekday_pm" if start_time.hour >= 12 else "weekday_am") return 1.0 if desired in availability else 0.4 def _experience_score(self, left: list[str], right: list[str]) -> float: left_set = set(left) right_set = set(right) if not left_set or not right_set: return 0.4 overlap = len(left_set.intersection(right_set)) return min(overlap / max(len(left_set), 1) + 0.4, 1.0) def _weighted_score( self, skill_score: float, region_score: float, time_score: float, experience_score: float, reliability_score: float, ) -> float: return ( self.settings.score_skill_weight * skill_score + self.settings.score_region_weight * region_score + self.settings.score_time_weight * time_score + self.settings.score_experience_weight * experience_score + self.settings.score_reliability_weight * reliability_score ) def _build_reasons( self, matched_skills: list[str], region_hit: float, time_score: float, experience_hits: list[str], reliability_score: float, target_region: str, ) -> list[str]: reasons = [] if matched_skills: reasons.append(f"具备{'、'.join(matched_skills[:3])}相关技能") if region_hit >= 1.0: reasons.append(f"服务区域覆盖{target_region},与岗位地点一致") elif region_hit >= 0.7: reasons.append("同城可到岗,区域匹配度较高") if time_score >= 1.0: reasons.append("可接单时间与岗位时间要求匹配") if experience_hits: reasons.append(f"具备{'、'.join(experience_hits[:2])}相关经验") if reliability_score >= 0.75: reasons.append("履约可信度较好,适合优先推荐") while len(reasons) < 3: reasons.append("岗位需求与候选画像存在基础匹配") return reasons[:5]