426 lines
19 KiB
Python
426 lines
19 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import re
|
|
from collections import Counter
|
|
from datetime import datetime, timedelta, timezone
|
|
from pathlib import Path
|
|
|
|
from dateutil import parser as date_parser
|
|
from pydantic import ValidationError
|
|
|
|
from app.core.config import get_settings
|
|
from app.core.logging import logger
|
|
from app.domain.schemas import ExtractResponse, JobCard, Salary, SkillScore, WorkerCard
|
|
from app.services.llm_client import LLMClient
|
|
from app.utils.ids import generate_id
|
|
from app.utils.prompts import load_prompt
|
|
|
|
|
|
class ExtractionService:
|
|
def __init__(self) -> None:
|
|
self.settings = get_settings()
|
|
self.skills = json.loads((self.settings.sample_data_dir / "skills.json").read_text(encoding="utf-8"))
|
|
self.categories = json.loads((self.settings.sample_data_dir / "categories.json").read_text(encoding="utf-8"))
|
|
self.regions = json.loads((self.settings.sample_data_dir / "regions.json").read_text(encoding="utf-8"))
|
|
self.sample_jobs = json.loads((self.settings.sample_data_dir / "jobs.json").read_text(encoding="utf-8"))
|
|
self.sample_workers = json.loads((self.settings.sample_data_dir / "workers.json").read_text(encoding="utf-8"))
|
|
self.default_region = self._build_default_region()
|
|
self.default_category = self._build_default_category()
|
|
self.default_salary_amount = self._build_default_salary_amount()
|
|
self.default_job_tags = self._build_default_job_tags()
|
|
self.default_worker_skills = self._build_default_worker_skills()
|
|
self.default_experience_tags = self._build_default_experience_tags()
|
|
self.category_skill_defaults = self._build_category_skill_defaults()
|
|
self.city_region_defaults = self._build_city_region_defaults()
|
|
self.tag_candidates = self._build_tag_candidates()
|
|
self.llm_client = LLMClient(self.settings)
|
|
self.shanghai_tz = timezone(timedelta(hours=8))
|
|
|
|
def extract_job(self, text: str) -> ExtractResponse:
|
|
logger.info("extract_job request text=%s", text)
|
|
llm_card = self._llm_extract_with_retry(text, self.settings.prompt_dir / "job_extract.md", JobCard)
|
|
if llm_card:
|
|
return ExtractResponse(success=True, data=llm_card)
|
|
|
|
try:
|
|
card = self._extract_job_rule(text)
|
|
return ExtractResponse(success=True, data=card)
|
|
except ValidationError as exc:
|
|
logger.exception("Rule job extraction validation failed")
|
|
return ExtractResponse(success=False, errors=[str(exc)], missing_fields=self._missing_fields(exc))
|
|
|
|
def extract_worker(self, text: str) -> ExtractResponse:
|
|
logger.info("extract_worker request text=%s", text)
|
|
llm_card = self._llm_extract_with_retry(text, self.settings.prompt_dir / "worker_extract.md", WorkerCard)
|
|
if llm_card:
|
|
return ExtractResponse(success=True, data=llm_card)
|
|
|
|
try:
|
|
card = self._extract_worker_rule(text)
|
|
return ExtractResponse(success=True, data=card)
|
|
except ValidationError as exc:
|
|
logger.exception("Rule worker extraction validation failed")
|
|
return ExtractResponse(success=False, errors=[str(exc)], missing_fields=self._missing_fields(exc))
|
|
|
|
def _llm_extract(self, text: str, prompt_path: Path):
|
|
try:
|
|
return self.llm_client.extract_json(load_prompt(prompt_path), text)
|
|
except Exception:
|
|
logger.exception("LLM extraction failed, fallback to rule-based extraction")
|
|
return None
|
|
|
|
def _llm_extract_with_retry(self, text: str, prompt_path: Path, schema_cls):
|
|
base_prompt = load_prompt(prompt_path)
|
|
llm_result = self._llm_extract(text, prompt_path)
|
|
if not llm_result:
|
|
return None
|
|
|
|
try:
|
|
return schema_cls(**llm_result.content)
|
|
except ValidationError as exc:
|
|
logger.warning("LLM extraction validation failed, trying schema-aware retry")
|
|
last_error = exc
|
|
last_output = llm_result.content
|
|
|
|
for _ in range(self.settings.extraction_llm_max_retries):
|
|
missing_fields = self._missing_fields(last_error)
|
|
repair_prompt = self._build_repair_prompt(base_prompt, schema_cls, missing_fields)
|
|
try:
|
|
repair_result = self.llm_client.extract_json(
|
|
repair_prompt,
|
|
self._build_repair_input(text, last_output, missing_fields),
|
|
)
|
|
except Exception:
|
|
logger.exception("LLM schema-aware retry failed")
|
|
return None
|
|
if not repair_result:
|
|
return None
|
|
last_output = repair_result.content
|
|
try:
|
|
return schema_cls(**repair_result.content)
|
|
except ValidationError as exc:
|
|
last_error = exc
|
|
logger.warning("LLM schema-aware retry still invalid missing_fields=%s", self._missing_fields(exc))
|
|
return None
|
|
|
|
def _build_repair_prompt(self, base_prompt: str, schema_cls, missing_fields: list[str]) -> str:
|
|
schema_json = json.dumps(schema_cls.model_json_schema(), ensure_ascii=False)
|
|
return (
|
|
f"{base_prompt}\n\n"
|
|
"你是结构化修复助手。请严格输出可被 JSON 解析的对象,不要输出解释文字。\n"
|
|
"目标是根据给定 schema 修复字段缺失和类型错误,优先保证必填字段完整。\n"
|
|
f"缺失或错误字段: {', '.join(missing_fields) if missing_fields else 'unknown'}\n"
|
|
f"JSON Schema: {schema_json}\n"
|
|
)
|
|
|
|
def _build_repair_input(self, original_text: str, last_output: dict, missing_fields: list[str]) -> str:
|
|
return (
|
|
f"原始文本:\n{original_text}\n\n"
|
|
f"上一次抽取结果:\n{json.dumps(last_output, ensure_ascii=False)}\n\n"
|
|
f"请重点修复字段:\n{json.dumps(missing_fields, ensure_ascii=False)}"
|
|
)
|
|
|
|
def _extract_job_rule(self, text: str) -> JobCard:
|
|
skill_hits = [item for item in self.skills if item in text]
|
|
category = next((item for item in self.categories if item in text), self.default_category)
|
|
region = self._extract_region(text)
|
|
salary = self._extract_salary(text)
|
|
headcount = self._extract_number(text, [r"(\d+)\s*[个名人位]"], default=1)
|
|
duration = self._extract_number(text, [r"(\d+(?:\.\d+)?)\s*小时"], default=4.0, cast=float)
|
|
tags = [tag for tag in self.tag_candidates if tag in text][:3]
|
|
title = next((f"{category}{skill_hits[0]}兼职" for _ in [0] if skill_hits), f"{category}兼职")
|
|
card = JobCard(
|
|
job_id=generate_id("job"),
|
|
title=title,
|
|
category=category,
|
|
description=text,
|
|
skills=skill_hits[:5] or self._guess_category_skills(category),
|
|
city=region["city"],
|
|
region=region["region"],
|
|
location_detail=self._extract_location(text, region),
|
|
start_time=self._extract_job_time(text),
|
|
duration_hours=duration,
|
|
headcount=int(headcount),
|
|
salary=salary,
|
|
work_mode="排班制" if "排班" in text else "兼职",
|
|
tags=tags or self.default_job_tags,
|
|
confidence=self._compute_confidence(skill_hits, region, salary.amount > 0),
|
|
)
|
|
return card
|
|
|
|
def _extract_worker_rule(self, text: str) -> WorkerCard:
|
|
skill_hits = [item for item in self.skills if item in text][:6]
|
|
region_hits = [item for item in self.regions if item["region"] in text or item["city"] in text]
|
|
if not region_hits:
|
|
city_hits = [item["city"] for item in self.regions if item["city"] in text]
|
|
unique_city_hits = list(dict.fromkeys(city_hits))
|
|
region_hits = [
|
|
{"city": city, "region": self.city_region_defaults.get(city, self.default_region["region"])}
|
|
for city in unique_city_hits
|
|
]
|
|
city_names = list(dict.fromkeys([item["city"] for item in region_hits])) or [self.default_region["city"]]
|
|
region_names = list(dict.fromkeys([item["region"] for item in region_hits])) or [self.default_region["region"]]
|
|
availability = self._extract_availability(text)
|
|
experience = [item for item in self.default_experience_tags if item in text]
|
|
card = WorkerCard(
|
|
worker_id=generate_id("worker"),
|
|
name=self._extract_name(text),
|
|
description=text,
|
|
skills=[
|
|
SkillScore(name=item, score=round(0.72 + index * 0.04, 2))
|
|
for index, item in enumerate(skill_hits or self.default_worker_skills)
|
|
],
|
|
cities=city_names,
|
|
regions=region_names,
|
|
availability=availability,
|
|
experience_tags=experience or self.default_experience_tags[:2],
|
|
reliability_score=0.76,
|
|
profile_completion=0.68,
|
|
confidence=self._compute_confidence(skill_hits, {"city": city_names[0], "region": region_names[0]}, True),
|
|
)
|
|
return card
|
|
|
|
def _extract_region(self, text: str) -> dict:
|
|
for item in self.regions:
|
|
if item["city"] in text and item["region"] in text:
|
|
return item
|
|
for item in self.regions:
|
|
if item["region"] in text:
|
|
return item
|
|
city_match = next((item["city"] for item in self.regions if item["city"] in text), "")
|
|
if city_match:
|
|
return {"city": city_match, "region": self.city_region_defaults.get(city_match, self.default_region["region"])}
|
|
return self.default_region
|
|
|
|
def _extract_location(self, text: str, region: dict) -> str:
|
|
markers = ["会展中心", "商场", "地铁站", "园区", "写字楼", "仓库", "门店"]
|
|
for marker in markers:
|
|
if marker in text:
|
|
return f"{region['city']}{region['region']}{marker}"
|
|
return f"{region['city']}{region['region']}待定点位"
|
|
|
|
def _extract_salary(self, text: str) -> Salary:
|
|
amount = self._extract_number(text, [r"(\d+(?:\.\d+)?)\s*(?:元|块)"], default=self.default_salary_amount, cast=float)
|
|
salary_type = "hourly" if "小时" in text and "/小时" in text else "daily"
|
|
return Salary(type=salary_type, amount=amount, currency="CNY")
|
|
|
|
def _extract_number(self, text: str, patterns: list[str], default, cast=int):
|
|
for pattern in patterns:
|
|
match = re.search(pattern, text)
|
|
if match:
|
|
return cast(match.group(1))
|
|
return default
|
|
|
|
def _extract_job_time(self, text: str) -> datetime:
|
|
now = datetime.now(self.shanghai_tz)
|
|
for candidate in self._time_candidates(text, now):
|
|
parsed = self._parse_datetime(candidate, now)
|
|
if parsed:
|
|
return parsed
|
|
return self._normalize_datetime(now + timedelta(days=1))
|
|
|
|
def _time_candidates(self, text: str, now: datetime) -> list[str]:
|
|
candidates = [text]
|
|
|
|
if any(token in text for token in ("今天", "今日")):
|
|
candidates.append(text.replace("今日", now.strftime("%Y-%m-%d")).replace("今天", now.strftime("%Y-%m-%d")))
|
|
if "明天" in text:
|
|
tomorrow = now + timedelta(days=1)
|
|
candidates.append(text.replace("明天", tomorrow.strftime("%Y-%m-%d")))
|
|
if "后天" in text:
|
|
day_after = now + timedelta(days=2)
|
|
candidates.append(text.replace("后天", day_after.strftime("%Y-%m-%d")))
|
|
|
|
weekday_map = {"一": 0, "二": 1, "三": 2, "四": 3, "五": 4, "六": 5, "日": 6, "天": 6}
|
|
week_match = re.search(r"(下周|本周|这周|周)([一二三四五六日天])", text)
|
|
if week_match:
|
|
week_token, weekday_token = week_match.groups()
|
|
target_weekday = weekday_map[weekday_token]
|
|
days_ahead = (target_weekday - now.weekday()) % 7
|
|
if week_token == "下周":
|
|
days_ahead = days_ahead + 7
|
|
elif week_token == "周" and days_ahead == 0:
|
|
days_ahead = 7
|
|
target_day = now + timedelta(days=days_ahead)
|
|
candidates.append(text.replace(week_match.group(0), target_day.strftime("%Y-%m-%d")))
|
|
return candidates
|
|
|
|
def _parse_datetime(self, text: str, now: datetime) -> datetime | None:
|
|
normalized = self._replace_time_words(text)
|
|
cleaned = re.sub(r"[,、。;,;]", " ", normalized)
|
|
cleaned = cleaned.replace("号", "日")
|
|
cleaned = re.sub(r"(\d{1,2})月(\d{1,2})日", rf"{now.year}-\1-\2", cleaned)
|
|
cleaned = re.sub(r"(\d{1,2})点半", r"\1:30", cleaned)
|
|
cleaned = re.sub(r"(\d{1,2})点", r"\1:00", cleaned)
|
|
cleaned = re.sub(r"(\d{1,2})时", r"\1:00", cleaned)
|
|
|
|
has_date = bool(re.search(r"\d{4}-\d{1,2}-\d{1,2}", cleaned))
|
|
if not has_date:
|
|
return None
|
|
try:
|
|
parsed = date_parser.parse(cleaned, fuzzy=True)
|
|
except Exception:
|
|
return None
|
|
return self._normalize_datetime(parsed)
|
|
|
|
def _replace_time_words(self, text: str) -> str:
|
|
replaced = text
|
|
replaced = re.sub(r"(今晚|晚上)", " 19:00 ", replaced)
|
|
replaced = re.sub(r"(下午)", " 14:00 ", replaced)
|
|
replaced = re.sub(r"(中午)", " 12:00 ", replaced)
|
|
replaced = re.sub(r"(早上|上午)", " 09:00 ", replaced)
|
|
replaced = re.sub(r"(凌晨)", " 01:00 ", replaced)
|
|
return replaced
|
|
|
|
def _normalize_datetime(self, value: datetime) -> datetime:
|
|
if value.tzinfo is None:
|
|
value = value.replace(tzinfo=self.shanghai_tz)
|
|
else:
|
|
value = value.astimezone(self.shanghai_tz)
|
|
return value.replace(second=0, microsecond=0)
|
|
|
|
def _extract_availability(self, text: str) -> list[str]:
|
|
tags = []
|
|
if "周末" in text:
|
|
tags.append("weekend")
|
|
if "上午" in text:
|
|
tags.append("weekday_am")
|
|
if "下午" in text:
|
|
tags.append("weekday_pm")
|
|
if "随时" in text or "都能" in text or "全天" in text:
|
|
tags.append("anytime")
|
|
return tags or ["anytime"]
|
|
|
|
def _extract_name(self, text: str) -> str:
|
|
if match := re.search(r"我叫([\u4e00-\u9fa5]{2,4})", text):
|
|
return match.group(1)
|
|
if match := re.search(r"我是([\u4e00-\u9fa5]{2,4})", text):
|
|
return match.group(1)
|
|
return "匿名候选人"
|
|
|
|
def _guess_category_skills(self, category: str) -> list[str]:
|
|
skills = self.category_skill_defaults.get(category)
|
|
if skills:
|
|
return skills
|
|
return self.default_worker_skills[:3]
|
|
|
|
def _compute_confidence(self, skill_hits: list[str], region: dict, has_salary: bool) -> float:
|
|
score = 0.55
|
|
if skill_hits:
|
|
score += 0.15
|
|
if region.get("city"):
|
|
score += 0.15
|
|
if has_salary:
|
|
score += 0.1
|
|
return min(round(score, 2), 0.95)
|
|
|
|
def _missing_fields(self, exc: ValidationError) -> list[str]:
|
|
return [".".join(str(part) for part in item["loc"]) for item in exc.errors()]
|
|
|
|
def _build_default_region(self) -> dict:
|
|
if self.sample_jobs:
|
|
pair_counter = Counter(
|
|
(item.get("city"), item.get("region"))
|
|
for item in self.sample_jobs
|
|
if item.get("city") and item.get("region")
|
|
)
|
|
if pair_counter:
|
|
city, region = pair_counter.most_common(1)[0][0]
|
|
return {"city": city, "region": region}
|
|
if self.regions:
|
|
return {"city": self.regions[0]["city"], "region": self.regions[0]["region"]}
|
|
return {"city": "深圳", "region": "南山"}
|
|
|
|
def _build_default_category(self) -> str:
|
|
counter = Counter(item.get("category") for item in self.sample_jobs if item.get("category"))
|
|
if counter:
|
|
return counter.most_common(1)[0][0]
|
|
return self.categories[0] if self.categories else "活动执行"
|
|
|
|
def _build_default_salary_amount(self) -> float:
|
|
amounts = sorted(
|
|
float(item["salary"]["amount"])
|
|
for item in self.sample_jobs
|
|
if isinstance(item.get("salary"), dict) and isinstance(item["salary"].get("amount"), (int, float))
|
|
)
|
|
if not amounts:
|
|
return 150.0
|
|
mid = len(amounts) // 2
|
|
if len(amounts) % 2 == 1:
|
|
return amounts[mid]
|
|
return round((amounts[mid - 1] + amounts[mid]) / 2, 2)
|
|
|
|
def _build_default_job_tags(self) -> list[str]:
|
|
counter = Counter(
|
|
tag
|
|
for item in self.sample_jobs
|
|
for tag in item.get("tags", [])
|
|
if isinstance(tag, str) and tag.strip()
|
|
)
|
|
top_tags = [tag for tag, _ in counter.most_common(3)]
|
|
return top_tags or ["有经验优先"]
|
|
|
|
def _build_default_worker_skills(self) -> list[str]:
|
|
counter = Counter(
|
|
skill.get("name")
|
|
for item in self.sample_workers
|
|
for skill in item.get("skills", [])
|
|
if isinstance(skill, dict) and isinstance(skill.get("name"), str) and skill.get("name")
|
|
)
|
|
top_skills = [name for name, _ in counter.most_common(4)]
|
|
return top_skills or ["活动执行", "引导", "登记"]
|
|
|
|
def _build_default_experience_tags(self) -> list[str]:
|
|
counter = Counter(
|
|
tag
|
|
for item in self.sample_workers
|
|
for tag in item.get("experience_tags", [])
|
|
if isinstance(tag, str) and tag.strip()
|
|
)
|
|
top_tags = [tag for tag, _ in counter.most_common(5)]
|
|
return top_tags or ["活动执行"]
|
|
|
|
def _build_category_skill_defaults(self) -> dict[str, list[str]]:
|
|
category_skills: dict[str, Counter] = {}
|
|
for item in self.sample_jobs:
|
|
category = item.get("category")
|
|
if not isinstance(category, str) or not category:
|
|
continue
|
|
counter = category_skills.setdefault(category, Counter())
|
|
for skill in item.get("skills", []):
|
|
if isinstance(skill, str) and skill:
|
|
counter[skill] += 1
|
|
return {category: [name for name, _ in counter.most_common(4)] for category, counter in category_skills.items()}
|
|
|
|
def _build_city_region_defaults(self) -> dict[str, str]:
|
|
counter: dict[str, Counter] = {}
|
|
for item in self.regions:
|
|
city = item.get("city")
|
|
region = item.get("region")
|
|
if not city or not region:
|
|
continue
|
|
counter.setdefault(city, Counter())[region] += 1
|
|
for item in self.sample_jobs:
|
|
city = item.get("city")
|
|
region = item.get("region")
|
|
if city and region:
|
|
counter.setdefault(city, Counter())[region] += 3
|
|
defaults: dict[str, str] = {}
|
|
for city, regions in counter.items():
|
|
defaults[city] = regions.most_common(1)[0][0]
|
|
return defaults
|
|
|
|
def _build_tag_candidates(self) -> list[str]:
|
|
sample_tags = list(
|
|
dict.fromkeys(
|
|
tag
|
|
for item in self.sample_jobs
|
|
for tag in item.get("tags", [])
|
|
if isinstance(tag, str) and tag.strip()
|
|
)
|
|
)
|
|
baseline_tags = ["女生优先", "男生优先", "有经验优先", "沟通好", "可连做优先"]
|
|
merged = list(dict.fromkeys([*sample_tags, *baseline_tags]))
|
|
return merged[:30]
|