Files
Airtep/gig-poc/apps/api/app/services/llm_client.py
2026-04-01 14:19:25 +08:00

105 lines
3.6 KiB
Python

from __future__ import annotations
import json
import httpx
from app.core.config import Settings
from app.domain.schemas import PromptOutput
from app.services.ai_guard import AIGuard
class LLMClient:
def __init__(self, settings: Settings):
self.settings = settings
self.guard = AIGuard(settings)
def extract_json(self, system_prompt: str, user_text: str) -> PromptOutput | None:
if not self.settings.llm_enabled or not self.settings.llm_base_url or not self.settings.llm_api_key:
self.guard.record_fallback()
return None
payload = {
"model": self.settings.llm_model,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_text},
],
"temperature": 0.1,
"response_format": {"type": "json_object"},
}
endpoints = [self.settings.llm_base_url, *self.settings.llm_fallback_base_urls]
raw_text = self._request_with_failover(
endpoints=endpoints,
path="/chat/completions",
payload=payload,
api_key=self.settings.llm_api_key,
)
if raw_text is None:
self.guard.record_fallback()
return None
return PromptOutput(content=json.loads(raw_text), raw_text=raw_text)
def embedding(self, text: str) -> list[float] | None:
if not self.settings.embedding_enabled:
return None
base_url = self.settings.embedding_base_url or self.settings.llm_base_url
api_key = self.settings.embedding_api_key or self.settings.llm_api_key
if not base_url or not api_key:
self.guard.record_fallback()
return None
payload = {
"model": self.settings.embedding_model,
"input": text,
}
endpoints = [base_url, *self.settings.embedding_fallback_base_urls]
data = self._request_with_failover(
endpoints=endpoints,
path="/embeddings",
payload=payload,
api_key=api_key,
return_full_response=True,
)
if data is None:
self.guard.record_fallback()
return None
embedding = data["data"][0]["embedding"]
if not isinstance(embedding, list):
return None
return [float(item) for item in embedding]
def metrics(self) -> dict:
return self.guard.snapshot()
def _request_with_failover(
self,
endpoints: list[str],
path: str,
payload: dict,
api_key: str,
return_full_response: bool = False,
):
if not endpoints:
return None
for index, endpoint in enumerate([item for item in endpoints if item]):
allowed, _ = self.guard.allow_request(endpoint)
if not allowed:
continue
if index > 0:
self.guard.record_failover()
try:
headers = {"Authorization": f"Bearer {api_key}"}
with httpx.Client(timeout=self.settings.ai_request_timeout_seconds) as client:
response = client.post(f"{endpoint.rstrip('/')}{path}", json=payload, headers=headers)
response.raise_for_status()
data = response.json()
self.guard.record_success(endpoint)
if return_full_response:
return data
return data["choices"][0]["message"]["content"]
except Exception:
self.guard.record_failure(endpoint)
continue
return None