74 lines
2.5 KiB
Python
74 lines
2.5 KiB
Python
from __future__ import annotations
|
||
|
||
from typing import Any
|
||
|
||
import httpx
|
||
|
||
from ..settings import settings
|
||
|
||
|
||
def _rule_based_summary(query: str, retrieved: list[dict[str, Any]]) -> dict[str, Any]:
|
||
top = retrieved[:3]
|
||
bullets = []
|
||
for r in top:
|
||
pid = r.get("product_id") or r.get("id") or "-"
|
||
title = r.get("title") or ""
|
||
follow = r.get("follow_score")
|
||
life = r.get("lifecycle")
|
||
bullets.append(f"- {pid} {title}(跟卖指数={follow} 生命周期={life})")
|
||
return {
|
||
"mode": "rules_only",
|
||
"query": query,
|
||
"retrieved": retrieved,
|
||
"answer": "基于当前向量库/指标的规则摘要:\n" + "\n".join(bullets),
|
||
}
|
||
|
||
|
||
def generate_insight(query: str, product_id: str | None, top_k: int) -> dict[str, Any]:
|
||
"""
|
||
这里先做“可运行的最小闭环”:
|
||
- 向量检索(尚未实现时返回空)
|
||
- 有 OPENAI_API_KEY 则调用 LLM,输出结构化建议
|
||
- 否则返回规则引擎摘要
|
||
"""
|
||
# TODO: 接入向量库检索(Milvus/Azure 等)。先保留协议,保证前端可用。
|
||
retrieved: list[dict[str, Any]] = []
|
||
|
||
if not settings.openai_api_key:
|
||
return _rule_based_summary(query, retrieved)
|
||
|
||
prompt = f"""你是电商数据分析与选品决策助手。
|
||
用户问题:{query}
|
||
|
||
请输出一个“发现爆款 -> 数据验证 -> 决策跟卖”的闭环建议,包含:
|
||
1) 结论摘要(3-5条)
|
||
2) 数据证据(引用关键指标:销量/增速/竞争/生命周期)
|
||
3) 风险点与反例(至少3条)
|
||
4) 可执行动作(选品、备货、投流、供应链)
|
||
如果没有足够数据,请明确说明缺口,并给出最小补充数据清单。
|
||
"""
|
||
|
||
headers = {"Authorization": f"Bearer {settings.openai_api_key}"}
|
||
payload = {
|
||
"model": settings.openai_model,
|
||
"input": prompt,
|
||
}
|
||
|
||
try:
|
||
with httpx.Client(timeout=30.0) as client:
|
||
r = client.post("https://api.openai.com/v1/responses", headers=headers, json=payload)
|
||
r.raise_for_status()
|
||
data = r.json()
|
||
text = ""
|
||
for item in data.get("output", []):
|
||
for c in item.get("content", []):
|
||
if c.get("type") in ("output_text", "text"):
|
||
text += c.get("text", "")
|
||
return {"mode": "llm", "query": query, "retrieved": retrieved, "answer": text.strip()}
|
||
except Exception as e:
|
||
out = _rule_based_summary(query, retrieved)
|
||
out["mode"] = "rules_fallback"
|
||
out["error"] = str(e)
|
||
return out
|
||
|