feat: new file

This commit is contained in:
Daniel
2026-03-18 18:57:58 +08:00
commit d0ff049899
31 changed files with 1507 additions and 0 deletions

16
.gitignore vendored Normal file
View File

@@ -0,0 +1,16 @@
**/.DS_Store
**/.env
**/.env.*
**/node_modules
**/.next
**/dist
**/__pycache__
**/*.pyc
**/.pytest_cache
**/.venv
**/venv
**/data
**/*.log
# local secrets / notes
sql.md

70
README.md Normal file
View File

@@ -0,0 +1,70 @@
# Crawl_demo — BI 看板 + 趋势引擎 + AI 分析
本项目从 MySQL`sql.md`)抽取电商数据,生成 BI 数据看板,并实现:
- 实时曲线:销量/GMV/排名等核心指标的近实时刷新
- 趋势预测:潜伏期识别、爆发力评分、时间序列预测
- 决策辅助:跟卖指数、生命周期预警
- AI 分析:基于向量库检索 + LLM总结“为什么它在涨、是否值得跟卖、风险点”
## 目录
- `backend/`FastAPI指标 API、趋势引擎、AI 检索/分析、ETL 任务)
- `frontend/`Next.jsBI 看板 UI
## 快速开始
1) 复制环境变量
```bash
cp backend/.env.example backend/.env
cp frontend/.env.example frontend/.env.local
```
### 一键 Docker 启动(推荐)
在项目根目录执行:
```bash
sh dev.sh
```
然后打开:
- 前端:`http://localhost:3001`
- 后端:`http://localhost:8000/docs`
启动脚本会自动输出:
- 容器 `ps` 状态
- 前后端最近日志tail
- HTTP 快速自检结果backend/frontend/api-proxy
2) 启动后端与前端(本地开发)
```bash
cd backend && python -m venv .venv && source .venv/bin/activate
pip install -r requirements.txt
uvicorn app.main:app --reload --port 8000
```
```bash
cd frontend
npm install
npm run dev
```
3) 打开页面
- 前端:`http://localhost:3000`
- 后端:`http://localhost:8000/docs`
## AI / 向量库
本项目默认支持 Milvus向量数据库。你也可以替换为 Azure SQL / Azure AI Search 等“MS 系”向量能力:
- 先把 embedding 与 upsert 部分抽象在 `backend/app/vector/`
- 替换实现即可
> 如果未配置 `OPENAI_API_KEY`AI 分析接口会降级为规则引擎的“可解释摘要”,不调用 LLM。

18
backend/Dockerfile Normal file
View File

@@ -0,0 +1,18 @@
FROM python:3.12-slim
WORKDIR /app
ENV PYTHONDONTWRITEBYTECODE=1
ENV PYTHONUNBUFFERED=1
RUN pip install --no-cache-dir --upgrade pip
COPY requirements.txt /app/requirements.txt
RUN pip install --no-cache-dir -r /app/requirements.txt
COPY app /app/app
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

1
backend/app/__init__.py Normal file
View File

@@ -0,0 +1 @@

15
backend/app/db.py Normal file
View File

@@ -0,0 +1,15 @@
from __future__ import annotations
from sqlalchemy import create_engine
from sqlalchemy.engine import Engine
from .settings import settings
def get_engine() -> Engine:
return create_engine(
settings.mysql_url,
pool_pre_ping=True,
pool_recycle=3600,
)

43
backend/app/main.py Normal file
View File

@@ -0,0 +1,43 @@
from __future__ import annotations
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from sqlalchemy.exc import OperationalError
from .settings import settings
from .routes import ai, debug, metrics, trend
from .services.db_sample import print_db_sample_to_logs
app = FastAPI(title="Crawl BI Backend", version="0.1.0")
@app.exception_handler(OperationalError)
async def db_operational_error_handler(_: Request, exc: OperationalError):
return JSONResponse(
status_code=503,
content={
"detail": "数据库连接失败(请检查 MYSQL_HOST/USER/PASSWORD 以及 MySQL 授权 host/IP 白名单)。",
"error": str(exc.orig) if getattr(exc, "orig", None) else str(exc),
},
)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins_list,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.include_router(metrics.router, prefix="/api/metrics", tags=["metrics"])
app.include_router(trend.router, prefix="/api/trend", tags=["trend"])
app.include_router(ai.router, prefix="/api/ai", tags=["ai"])
app.include_router(debug.router, prefix="/api/debug", tags=["debug"])
@app.on_event("startup")
def _startup_print_sample() -> None:
if settings.debug_print_db_sample:
print_db_sample_to_logs()

View File

@@ -0,0 +1 @@

24
backend/app/routes/ai.py Normal file
View File

@@ -0,0 +1,24 @@
from __future__ import annotations
from fastapi import APIRouter
from pydantic import BaseModel, Field
from ..services.ai_insight import generate_insight
router = APIRouter()
class InsightRequest(BaseModel):
query: str = Field(..., min_length=1, max_length=2000)
product_id: str | None = None
top_k: int = Field(6, ge=1, le=20)
@router.post("/insight")
def insight(req: InsightRequest):
"""
基于向量检索 + LLM可选输出“爆款发现 -> 数据验证 -> 决策跟卖”的建议。
"""
return generate_insight(query=req.query, product_id=req.product_id, top_k=req.top_k)

View File

@@ -0,0 +1,18 @@
from __future__ import annotations
from fastapi import APIRouter, Query
from ..services.db_sample import print_db_sample_to_logs
router = APIRouter()
@router.post("/db-sample")
def db_sample(limit: int = Query(10, ge=1, le=200)):
"""
触发一次“数据库样例打印到后端日志”,用于快速理解数据内容与结构。
"""
print_db_sample_to_logs(limit=limit)
return {"ok": True, "printed": True, "limit": limit}

View File

@@ -0,0 +1,56 @@
from __future__ import annotations
from datetime import datetime, timedelta
import pandas as pd
from fastapi import APIRouter, HTTPException, Query
from sqlalchemy import text
from ..db import get_engine
from ..services.schema_discovery import discover_schema
from ..services.timeseries import normalize_timeseries
router = APIRouter()
@router.get("/overview")
def overview():
"""
返回 BI 顶部卡片核心指标(尽量从现有表自动推断)。
"""
engine = get_engine()
schema = discover_schema(engine)
if not schema.sales_table:
raise HTTPException(status_code=422, detail="未发现可用销量/订单明细表(需要至少包含 product_id + 时间 + 数量/金额)")
q = text(schema.overview_sql)
with engine.connect() as conn:
row = conn.execute(q).mappings().first()
return {"schema": schema.model_dump(), "metrics": dict(row) if row else {}}
@router.get("/sales/timeseries")
def sales_timeseries(
product_id: str = Query(..., min_length=1),
days: int = Query(30, ge=1, le=365),
):
engine = get_engine()
schema = discover_schema(engine)
if not schema.sales_table:
raise HTTPException(status_code=422, detail="未发现可用销量/订单明细表")
since = datetime.utcnow() - timedelta(days=days)
q = text(schema.timeseries_sql)
with engine.connect() as conn:
df = pd.read_sql(
q,
conn,
params={"product_id": product_id, "since": since},
)
if df.empty:
return {"product_id": product_id, "points": []}
points = normalize_timeseries(df, ts_col="ds", value_cols=["units", "gmv"])
return {"product_id": product_id, "points": points}

View File

@@ -0,0 +1,64 @@
from __future__ import annotations
import math
from datetime import datetime, timedelta
import pandas as pd
from fastapi import APIRouter, HTTPException, Query
from sqlalchemy import text
from ..db import get_engine
from ..services.forecast import forecast_next_n
from ..services.schema_discovery import discover_schema
from ..services.trend_engine import compute_trend_scores
router = APIRouter()
@router.get("/potential-winners")
def potential_winners(days: int = Query(14, ge=3, le=60), limit: int = Query(50, ge=1, le=200)):
engine = get_engine()
schema = discover_schema(engine)
if not schema.sales_table:
raise HTTPException(status_code=422, detail="未发现可用销量/订单明细表")
since = datetime.utcnow() - timedelta(days=days)
q = text(schema.trend_candidates_sql)
with engine.connect() as conn:
df = pd.read_sql(q, conn, params={"since": since, "limit": limit * 5})
if df.empty:
return {"items": []}
scored = compute_trend_scores(df)
scored = scored.sort_values("potential_score", ascending=False).head(limit)
return {"items": scored.to_dict(orient="records")}
@router.get("/forecast")
def forecast(
product_id: str = Query(..., min_length=1),
days: int = Query(30, ge=7, le=180),
horizon: int = Query(14, ge=1, le=60),
):
engine = get_engine()
schema = discover_schema(engine)
if not schema.sales_table:
raise HTTPException(status_code=422, detail="未发现可用销量/订单明细表")
since = datetime.utcnow() - timedelta(days=days)
q = text(schema.timeseries_sql)
with engine.connect() as conn:
df = pd.read_sql(q, conn, params={"product_id": product_id, "since": since})
if df.empty:
return {"product_id": product_id, "forecast": []}
df = df.sort_values("ds")
y = df["units"].astype(float).fillna(0.0).values
yhat = forecast_next_n(y, n=horizon)
start = pd.to_datetime(df["ds"]).max()
out = []
for i, v in enumerate(yhat, start=1):
out.append({"ds": (start + pd.Timedelta(days=i)).to_pydatetime().isoformat(), "units_hat": float(max(0.0, v))})
return {"product_id": product_id, "forecast": out}

View File

@@ -0,0 +1,73 @@
from __future__ import annotations
from typing import Any
import httpx
from ..settings import settings
def _rule_based_summary(query: str, retrieved: list[dict[str, Any]]) -> dict[str, Any]:
top = retrieved[:3]
bullets = []
for r in top:
pid = r.get("product_id") or r.get("id") or "-"
title = r.get("title") or ""
follow = r.get("follow_score")
life = r.get("lifecycle")
bullets.append(f"- {pid} {title}(跟卖指数={follow} 生命周期={life}")
return {
"mode": "rules_only",
"query": query,
"retrieved": retrieved,
"answer": "基于当前向量库/指标的规则摘要:\n" + "\n".join(bullets),
}
def generate_insight(query: str, product_id: str | None, top_k: int) -> dict[str, Any]:
"""
这里先做“可运行的最小闭环”:
- 向量检索(尚未实现时返回空)
- 有 OPENAI_API_KEY 则调用 LLM输出结构化建议
- 否则返回规则引擎摘要
"""
# TODO: 接入向量库检索Milvus/Azure 等)。先保留协议,保证前端可用。
retrieved: list[dict[str, Any]] = []
if not settings.openai_api_key:
return _rule_based_summary(query, retrieved)
prompt = f"""你是电商数据分析与选品决策助手。
用户问题:{query}
请输出一个“发现爆款 -> 数据验证 -> 决策跟卖”的闭环建议,包含:
1) 结论摘要3-5条
2) 数据证据(引用关键指标:销量/增速/竞争/生命周期)
3) 风险点与反例至少3条
4) 可执行动作(选品、备货、投流、供应链)
如果没有足够数据,请明确说明缺口,并给出最小补充数据清单。
"""
headers = {"Authorization": f"Bearer {settings.openai_api_key}"}
payload = {
"model": settings.openai_model,
"input": prompt,
}
try:
with httpx.Client(timeout=30.0) as client:
r = client.post("https://api.openai.com/v1/responses", headers=headers, json=payload)
r.raise_for_status()
data = r.json()
text = ""
for item in data.get("output", []):
for c in item.get("content", []):
if c.get("type") in ("output_text", "text"):
text += c.get("text", "")
return {"mode": "llm", "query": query, "retrieved": retrieved, "answer": text.strip()}
except Exception as e:
out = _rule_based_summary(query, retrieved)
out["mode"] = "rules_fallback"
out["error"] = str(e)
return out

View File

@@ -0,0 +1,105 @@
from __future__ import annotations
import json
import logging
from typing import Any
import pandas as pd
from sqlalchemy import Engine, text
from ..db import get_engine
from ..services.schema_discovery import discover_schema
from ..settings import settings
log = logging.getLogger("db_sample")
def _truncate_value(v: Any, max_len: int) -> Any:
if v is None:
return None
if isinstance(v, (int, float, bool)):
return v
s = str(v)
if len(s) <= max_len:
return s
return s[: max_len - 3] + "..."
def _df_to_records(df: pd.DataFrame, max_str_len: int) -> list[dict[str, Any]]:
out: list[dict[str, Any]] = []
for _, row in df.iterrows():
rec: dict[str, Any] = {}
for k, v in row.items():
rec[str(k)] = _truncate_value(v, max_str_len)
out.append(rec)
return out
def _list_tables(engine: Engine) -> list[str]:
with engine.connect() as conn:
rows = conn.execute(
text(
"""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = DATABASE()
ORDER BY table_name
"""
)
).all()
return [r[0] for r in rows]
def _table_row_count(engine: Engine, table: str) -> int | None:
try:
with engine.connect() as conn:
v = conn.execute(text(f"SELECT COUNT(*) FROM {table}")).scalar()
return int(v) if v is not None else None
except Exception:
return None
def _sample_table(engine: Engine, table: str, limit: int) -> pd.DataFrame:
with engine.connect() as conn:
return pd.read_sql(text(f"SELECT * FROM {table} LIMIT {limit}"), conn)
def print_db_sample_to_logs(limit: int | None = None) -> None:
"""
打印数据库结构与样例行到后端日志,便于分析。
注意:会截断长字符串,避免日志爆炸。
"""
engine = get_engine()
schema = discover_schema(engine)
eff_limit = int(limit or settings.debug_db_sample_limit)
max_str_len = int(settings.debug_db_sample_max_str_len)
tables = _list_tables(engine)
log.warning("DB SAMPLE: discovered schema=%s", schema.model_dump())
log.warning("DB SAMPLE: tables=%s", tables)
# prioritize discovered tables first
prioritized: list[str] = []
for t in [schema.sales_table, schema.products_table]:
if t and t in tables and t not in prioritized:
prioritized.append(t)
for t in tables:
if t not in prioritized:
prioritized.append(t)
for t in prioritized[: min(len(prioritized), 8)]:
cnt = _table_row_count(engine, t)
try:
df = _sample_table(engine, t, eff_limit)
recs = _df_to_records(df, max_str_len=max_str_len)
log.warning(
"DB SAMPLE: table=%s rows=%s cols=%s sample=%s",
t,
cnt,
list(df.columns),
json.dumps(recs, ensure_ascii=False),
)
except Exception as e:
log.warning("DB SAMPLE: table=%s rows=%s sample_failed=%s", t, cnt, str(e))

View File

@@ -0,0 +1,28 @@
from __future__ import annotations
import numpy as np
from statsmodels.tsa.holtwinters import ExponentialSmoothing
def forecast_next_n(y: np.ndarray, n: int) -> np.ndarray:
"""
轻量级预测:优先 Holt-Winters对短序列也相对稳失败则用简单移动平均。
"""
y = np.asarray(y, dtype=float)
if y.size < 3:
return np.repeat(y[-1] if y.size else 0.0, n)
try:
model = ExponentialSmoothing(
y,
trend="add",
seasonal=None,
initialization_method="estimated",
)
fit = model.fit(optimized=True)
return np.asarray(fit.forecast(n), dtype=float)
except Exception:
window = int(min(7, max(3, y.size // 2)))
avg = float(np.mean(y[-window:])) if y.size else 0.0
return np.repeat(avg, n)

View File

@@ -0,0 +1,223 @@
from __future__ import annotations
from dataclasses import dataclass
from pydantic import BaseModel
from sqlalchemy import Engine, text
class DiscoveredSchema(BaseModel):
# discovered table names
sales_table: str | None = None
products_table: str | None = None
# required columns (in sales_table)
sales_product_id_col: str | None = None
sales_time_col: str | None = None
sales_units_col: str | None = None
sales_amount_col: str | None = None
# optional product cols
product_title_col: str | None = None
product_created_col: str | None = None
product_rank_col: str | None = None
product_category_col: str | None = None
product_desc_col: str | None = None
@property
def overview_sql(self) -> str:
# minimal, safe aggregations
t = self.sales_table
pid = self.sales_product_id_col
ts = self.sales_time_col
units = self.sales_units_col
amount = self.sales_amount_col
return f"""
SELECT
COUNT(DISTINCT {pid}) AS products,
SUM(COALESCE({units}, 0)) AS units_30d,
SUM(COALESCE({amount}, 0)) AS gmv_30d,
COUNT(*) AS rows_30d
FROM {t}
WHERE {ts} >= (UTC_TIMESTAMP() - INTERVAL 30 DAY)
"""
@property
def timeseries_sql(self) -> str:
t = self.sales_table
pid = self.sales_product_id_col
ts = self.sales_time_col
units = self.sales_units_col
amount = self.sales_amount_col
return f"""
SELECT
DATE({ts}) AS ds,
SUM(COALESCE({units}, 0)) AS units,
SUM(COALESCE({amount}, 0)) AS gmv
FROM {t}
WHERE {pid} = :product_id
AND {ts} >= :since
GROUP BY DATE({ts})
ORDER BY ds ASC
"""
@property
def trend_candidates_sql(self) -> str:
# produce per-product last-N-day rollups; join products when available
t = self.sales_table
pid = self.sales_product_id_col
ts = self.sales_time_col
units = self.sales_units_col
amount = self.sales_amount_col
p = self.products_table
title = self.product_title_col
created = self.product_created_col
rank = self.product_rank_col
cat = self.product_category_col
join = ""
if p:
join = f"LEFT JOIN {p} p ON p.{pid} = s.{pid}" if self._products_has_same_pid_name else f""
# if we can't confidently join, still return sales-only metrics
select_p = ""
if p and join:
title_expr = f"p.{title}" if title else "NULL"
cat_expr = f"p.{cat}" if cat else "NULL"
created_expr = f"p.{created}" if created else "NULL"
rank_expr = f"p.{rank}" if rank else "NULL"
select_p = f""",
{title_expr} AS title,
{cat_expr} AS category,
{created_expr} AS created_at,
{rank_expr} AS rank_now
"""
return f"""
SELECT
s.{pid} AS product_id,
SUM(COALESCE(s.{units}, 0)) AS units,
SUM(COALESCE(s.{amount}, 0)) AS gmv,
COUNT(*) AS records,
MIN(s.{ts}) AS first_seen,
MAX(s.{ts}) AS last_seen
{select_p}
FROM {t} s
{join}
WHERE s.{ts} >= :since
GROUP BY s.{pid}
ORDER BY units DESC
LIMIT :limit
"""
@property
def _products_has_same_pid_name(self) -> bool:
# discovery sets this attribute dynamically
return getattr(self, "__products_has_same_pid_name", False)
def set_products_pid_same(self, v: bool) -> None:
setattr(self, "__products_has_same_pid_name", v)
SALES_UNITS_CANDIDATES = ["units", "qty", "quantity", "sales", "sold", "order_qty", "num"]
SALES_AMOUNT_CANDIDATES = ["amount", "gmv", "revenue", "pay_amount", "total", "price", "order_amount"]
TIME_CANDIDATES = ["created_at", "create_time", "created", "ts", "timestamp", "date_time", "paid_at", "order_time"]
PID_CANDIDATES = ["product_id", "item_id", "sku_id", "goods_id", "asin"]
PRODUCT_TITLE_CANDIDATES = ["title", "name", "product_name", "item_title"]
PRODUCT_DESC_CANDIDATES = ["description", "desc", "detail"]
PRODUCT_CREATED_CANDIDATES = ["created_at", "create_time", "created"]
PRODUCT_RANK_CANDIDATES = ["rank", "bsr_rank", "position"]
PRODUCT_CATEGORY_CANDIDATES = ["category", "cat", "category_name"]
def _lower(s: str | None) -> str:
return (s or "").lower()
def _pick(cols: list[str], candidates: list[str]) -> str | None:
cols_l = {_lower(c): c for c in cols}
for cand in candidates:
if cand in cols_l:
return cols_l[cand]
return None
def discover_schema(engine: Engine) -> DiscoveredSchema:
"""
在未知表结构的情况下做“足够稳妥”的自动发现:
- 优先寻找包含 product_id + 时间 + 数量/金额 的表作为 sales_table
- 寻找包含 title/name 等列的表作为 products_table
"""
with engine.connect() as conn:
rows = conn.execute(
text(
"""
SELECT table_name, column_name
FROM information_schema.columns
WHERE table_schema = DATABASE()
ORDER BY table_name, ordinal_position
"""
)
).all()
by_table: dict[str, list[str]] = {}
for t, c in rows:
by_table.setdefault(t, []).append(c)
best_sales: tuple[int, str, dict[str, str]] | None = None
best_products: tuple[int, str, dict[str, str]] | None = None
for t, cols in by_table.items():
pid = _pick(cols, PID_CANDIDATES)
ts = _pick(cols, TIME_CANDIDATES)
units = _pick(cols, SALES_UNITS_CANDIDATES)
amount = _pick(cols, SALES_AMOUNT_CANDIDATES)
score = 0
if pid:
score += 3
if ts:
score += 3
if units:
score += 2
if amount:
score += 1
if score >= 6:
if best_sales is None or score > best_sales[0]:
best_sales = (score, t, {"pid": pid, "ts": ts, "units": units, "amount": amount})
title = _pick(cols, PRODUCT_TITLE_CANDIDATES)
if title:
pscore = 2
if _pick(cols, PID_CANDIDATES):
pscore += 2
if _pick(cols, PRODUCT_CATEGORY_CANDIDATES):
pscore += 1
if _pick(cols, PRODUCT_DESC_CANDIDATES):
pscore += 1
if best_products is None or pscore > best_products[0]:
best_products = (pscore, t, {"title": title})
schema = DiscoveredSchema()
if best_sales:
_, t, m = best_sales
schema.sales_table = t
schema.sales_product_id_col = m["pid"]
schema.sales_time_col = m["ts"]
schema.sales_units_col = m["units"] or m["amount"] # last resort
schema.sales_amount_col = m["amount"] or m["units"]
if best_products:
_, pt, _ = best_products
schema.products_table = pt
cols = by_table.get(pt, [])
schema.product_title_col = _pick(cols, PRODUCT_TITLE_CANDIDATES)
schema.product_desc_col = _pick(cols, PRODUCT_DESC_CANDIDATES)
schema.product_created_col = _pick(cols, PRODUCT_CREATED_CANDIDATES)
schema.product_rank_col = _pick(cols, PRODUCT_RANK_CANDIDATES)
schema.product_category_col = _pick(cols, PRODUCT_CATEGORY_CANDIDATES)
schema.set_products_pid_same(_pick(cols, PID_CANDIDATES) == schema.sales_product_id_col)
return schema

View File

@@ -0,0 +1,21 @@
from __future__ import annotations
import pandas as pd
def normalize_timeseries(df: pd.DataFrame, ts_col: str, value_cols: list[str]) -> list[dict]:
df = df.copy()
df[ts_col] = pd.to_datetime(df[ts_col], errors="coerce")
df = df.dropna(subset=[ts_col]).sort_values(ts_col)
out = []
for _, r in df.iterrows():
p = {"ds": r[ts_col].to_pydatetime().isoformat()}
for c in value_cols:
v = r.get(c)
try:
p[c] = float(v) if v is not None else 0.0
except Exception:
p[c] = 0.0
out.append(p)
return out

View File

@@ -0,0 +1,108 @@
from __future__ import annotations
import math
import numpy as np
import pandas as pd
def _sigmoid(x: float) -> float:
return 1.0 / (1.0 + math.exp(-x))
def compute_trend_scores(df: pd.DataFrame) -> pd.DataFrame:
"""
在“表结构不确定”的情况下,先基于可用字段做可解释评分:
- 潜伏期识别Potential Winners新上架/刚出现 + 指标增长快
- 爆发力:若存在 tiktok/search 等字段则融合,否则使用销量/GMV加速度代理
- 决策建议:跟卖指数 + 生命周期预警(缺失字段会自动降权)
"""
out = df.copy()
now = pd.Timestamp.now(tz="UTC")
# 新品/潜伏期first_seen 越近、同时 units/gmv 相对高 -> potential
first_seen = pd.to_datetime(out.get("first_seen"), errors="coerce", utc=True)
age_days = (now - first_seen).dt.total_seconds() / 86400.0
age_days = age_days.fillna(999.0).clip(lower=0.0)
units = pd.to_numeric(out.get("units"), errors="coerce").fillna(0.0)
gmv = pd.to_numeric(out.get("gmv"), errors="coerce").fillna(0.0)
# 规模归一log1p 降噪
units_s = np.log1p(units)
gmv_s = np.log1p(gmv)
freshness = 1.0 / (1.0 + (age_days / 7.0)) # 0~1
scale = (units_s.rank(pct=True) * 0.6 + gmv_s.rank(pct=True) * 0.4).clip(0.0, 1.0)
out["potential_score"] = (freshness * 0.55 + scale * 0.45).clip(0.0, 1.0)
# 爆发力:优先融合可选字段
tiktok_raw = out["tiktok_hot"] if "tiktok_hot" in out.columns else pd.Series(np.nan, index=out.index)
search_raw = out["search_growth"] if "search_growth" in out.columns else pd.Series(np.nan, index=out.index)
tiktok = pd.to_numeric(tiktok_raw, errors="coerce")
search_g = pd.to_numeric(search_raw, errors="coerce")
if tiktok.notna().any() or search_g.notna().any():
tiktok_s = tiktok.fillna(tiktok.median() if tiktok.notna().any() else 0.0)
search_s = search_g.fillna(search_g.median() if search_g.notna().any() else 0.0)
burst = (
pd.Series(tiktok_s).rank(pct=True) * 0.6
+ pd.Series(search_s).rank(pct=True) * 0.4
).clip(0.0, 1.0)
else:
# 无外部热度字段:用规模 + 新鲜度 作为代理
burst = (scale * 0.65 + freshness * 0.35).clip(0.0, 1.0)
out["burst_score"] = burst
# 跟卖指数竞争records 越多)负向;利润空间/供应链难度若缺失则降级
records = pd.to_numeric(out.get("records"), errors="coerce").fillna(0.0)
competition = records.rank(pct=True).clip(0.0, 1.0) # 越大越卷
margin_raw = out["margin"] if "margin" in out.columns else pd.Series(np.nan, index=out.index)
margin = pd.to_numeric(margin_raw, errors="coerce")
if margin.notna().any():
margin_s = margin.fillna(margin.median()).rank(pct=True).clip(0.0, 1.0)
margin_w = 0.35
else:
margin_s = pd.Series(0.5, index=out.index)
margin_w = 0.15
supply_raw = out["supply_difficulty"] if "supply_difficulty" in out.columns else pd.Series(np.nan, index=out.index)
supply = pd.to_numeric(supply_raw, errors="coerce")
if supply.notna().any():
supply_s = (1.0 - supply.fillna(supply.median()).rank(pct=True)).clip(0.0, 1.0) # 越难越低分
supply_w = 0.20
else:
supply_s = pd.Series(0.5, index=out.index)
supply_w = 0.10
# 趋势作为正向
trend = (out["potential_score"] * 0.5 + out["burst_score"] * 0.5).clip(0.0, 1.0)
trend_w = 0.45
comp_w = 0.20
follow = (
trend * trend_w
+ margin_s * margin_w
+ supply_s * supply_w
+ (1.0 - competition) * comp_w
)
out["follow_score"] = follow.clip(0.0, 1.0)
# 生命周期预警(简化):过老 + 规模不增长 + 竞争高 => red-ocean / decline
lifecycle = []
for i in out.index:
a = float(age_days.loc[i])
comp = float(competition.loc[i])
tr = float(trend.loc[i])
if a > 120 and comp > 0.7 and tr < 0.4:
lifecycle.append("decline_or_red_ocean")
elif a > 60 and comp > 0.75:
lifecycle.append("red_ocean")
elif a < 21 and tr > 0.65:
lifecycle.append("early_growth")
else:
lifecycle.append("normal")
out["lifecycle"] = lifecycle
return out

42
backend/app/settings.py Normal file
View File

@@ -0,0 +1,42 @@
from __future__ import annotations
from pydantic_settings import BaseSettings, SettingsConfigDict
class Settings(BaseSettings):
model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", extra="ignore")
mysql_host: str
mysql_port: int = 3306
mysql_user: str
mysql_password: str
mysql_database: str
openai_api_key: str | None = None
openai_model: str = "gpt-4.1-mini"
openai_embed_model: str = "text-embedding-3-small"
milvus_host: str = "localhost"
milvus_port: int = 19530
milvus_collection: str = "products_v1"
app_env: str = "dev"
app_cors_origins: str = "http://localhost:3000"
debug_print_db_sample: bool = False
debug_db_sample_limit: int = 10
debug_db_sample_max_str_len: int = 120
@property
def mysql_url(self) -> str:
return (
f"mysql+pymysql://{self.mysql_user}:{self.mysql_password}"
f"@{self.mysql_host}:{self.mysql_port}/{self.mysql_database}?charset=utf8mb4"
)
@property
def cors_origins_list(self) -> list[str]:
return [o.strip() for o in self.app_cors_origins.split(",") if o.strip()]
settings = Settings()

13
backend/requirements.txt Normal file
View File

@@ -0,0 +1,13 @@
fastapi==0.115.11
uvicorn[standard]==0.34.0
pydantic==2.10.6
pydantic-settings==2.8.1
sqlalchemy==2.0.39
pymysql==1.1.1
pandas==2.2.3
numpy==2.2.3
scikit-learn==1.6.1
statsmodels==0.14.4
httpx==0.28.1
tenacity==9.0.0
python-dotenv==1.0.1

100
dev.sh Executable file
View File

@@ -0,0 +1,100 @@
#!/usr/bin/env sh
set -eu
ROOT_DIR="$(cd "$(dirname "$0")" && pwd)"
cd "$ROOT_DIR"
MODE="${1:-up}"
if ! command -v docker >/dev/null 2>&1; then
echo "Docker 未安装或不可用。请先安装 Docker Desktop。" >&2
exit 1
fi
if ! docker info >/dev/null 2>&1; then
echo "Docker daemon 未启动。请先启动 Docker Desktop。" >&2
exit 1
fi
if ! command -v docker >/dev/null 2>&1 || ! docker compose version >/dev/null 2>&1; then
echo "docker compose 不可用,请升级 Docker Desktop。" >&2
exit 1
fi
if [ ! -f "backend/.env" ]; then
cp "backend/.env.example" "backend/.env"
echo "已生成 backend/.env请把 MYSQL_PASSWORD / OPENAI_API_KEY 改成你的值)"
fi
case "$MODE" in
up)
echo "启动中:构建并启动所有容器(随后将跟随日志,按 Ctrl+C 退出跟随)..."
docker compose up --build -d
;;
-d|detached)
echo "启动中:构建并后台启动所有容器..."
docker compose up --build -d
;;
down)
docker compose down
exit 0
;;
ps)
docker compose ps
exit 0
;;
logs)
docker compose logs --tail=200 backend frontend
exit 0
;;
follow|f)
docker compose logs -f --tail=200 backend frontend
exit 0
;;
*)
echo "用法:"
echo " sh dev.sh # 启动并跟随日志(默认)"
echo " sh dev.sh -d # 后台启动"
echo " sh dev.sh follow # 跟随日志"
echo " sh dev.sh logs # 打印最近日志"
echo " sh dev.sh ps # 查看容器状态"
echo " sh dev.sh down # 停止并清理"
exit 1
;;
esac
echo ""
echo "容器状态:"
docker compose ps
echo ""
echo "后端最近日志backendtail=120"
docker compose logs --tail=120 backend || true
echo ""
echo "前端最近日志frontendtail=120"
docker compose logs --tail=120 frontend || true
echo ""
echo "快速自检HTTP"
(
curl -fsS "http://localhost:8000/docs" >/dev/null 2>&1 && echo "- backend: OK (http://localhost:8000/docs)" || echo "- backend: FAIL (检查 backend 日志/数据库连通性)"
curl -fsS "http://localhost:3001" >/dev/null 2>&1 && echo "- frontend: OK (http://localhost:3001)" || echo "- frontend: FAIL (检查 frontend 日志)"
curl -fsS "http://localhost:3001/api/metrics/overview" >/dev/null 2>&1 && echo "- api-proxy: OK (/api/metrics/overview)" || echo "- api-proxy: FAIL (通常是 backend 500)"
) || true
echo ""
echo "已启动:"
echo "- 前端: http://localhost:3001"
echo "- 后端 Swagger: http://localhost:8000/docs"
echo "- Milvus: localhost:19530"
echo ""
echo "跟随日志sh dev.sh follow (或 docker compose logs -f --tail=200 backend frontend"
echo "停止docker compose down"
if [ "$MODE" = "up" ]; then
echo ""
echo "开始跟随日志Ctrl+C 可退出跟随,不会停止容器)..."
docker compose logs -f --tail=200 backend frontend
fi

80
docker-compose.yml Normal file
View File

@@ -0,0 +1,80 @@
services:
backend:
build:
context: ./backend
dockerfile: Dockerfile
env_file:
- ./backend/.env
environment:
- APP_CORS_ORIGINS=http://localhost:3000,http://localhost:3001
ports:
- "8000:8000"
depends_on:
- milvus
restart: unless-stopped
frontend:
build:
context: ./frontend
dockerfile: Dockerfile
env_file:
- ./frontend/.env.example
environment:
- BACKEND_URL=http://backend:8000
ports:
- "3001:3000"
depends_on:
- backend
restart: unless-stopped
# Milvus standalone (vector DB)
etcd:
image: quay.io/coreos/etcd:v3.5.16
environment:
- ETCD_AUTO_COMPACTION_MODE=revision
- ETCD_AUTO_COMPACTION_RETENTION=1000
- ETCD_QUOTA_BACKEND_BYTES=4294967296
- ETCD_SNAPSHOT_COUNT=50000
command: >
etcd
-advertise-client-urls=http://0.0.0.0:2379
-listen-client-urls=http://0.0.0.0:2379
--data-dir /etcd
volumes:
- milvus_etcd:/etcd
restart: unless-stopped
minio:
image: minio/minio:RELEASE.2025-01-20T14-49-07Z
environment:
- MINIO_ACCESS_KEY=minioadmin
- MINIO_SECRET_KEY=minioadmin
command: minio server /minio_data --console-address ":9001"
ports:
- "9000:9000"
- "9001:9001"
volumes:
- milvus_minio:/minio_data
restart: unless-stopped
milvus:
image: milvusdb/milvus:v2.5.6
command: ["milvus", "run", "standalone"]
environment:
- ETCD_ENDPOINTS=etcd:2379
- MINIO_ADDRESS=minio:9000
ports:
- "19530:19530"
- "9091:9091"
volumes:
- milvus_data:/var/lib/milvus
depends_on:
- etcd
- minio
restart: unless-stopped
volumes:
milvus_etcd:
milvus_minio:
milvus_data:

20
frontend/Dockerfile Normal file
View File

@@ -0,0 +1,20 @@
FROM node:20-slim AS deps
WORKDIR /app
COPY package.json /app/package.json
RUN npm install
FROM node:20-slim AS builder
WORKDIR /app
COPY --from=deps /app/node_modules /app/node_modules
COPY . /app
RUN npm run build
FROM node:20-slim AS runner
WORKDIR /app
ENV NODE_ENV=production
COPY --from=builder /app/.next/standalone /app
COPY --from=builder /app/.next/static /app/.next/static
COPY --from=builder /app/public /app/public
EXPOSE 3000
CMD ["node", "server.js"]

View File

@@ -0,0 +1,62 @@
import { NextRequest } from "next/server";
const BACKEND_URL = process.env.BACKEND_URL || "http://localhost:8000";
export async function GET(req: NextRequest, ctx: { params: Promise<{ path: string[] }> }) {
const { path } = await ctx.params;
const url = new URL(req.url);
const upstream = `${BACKEND_URL}/api/${path.join("/")}${url.search}`;
return await safeUpstreamFetch(() => fetch(upstream, { headers: forwardHeaders(req) }));
}
export async function POST(req: NextRequest, ctx: { params: Promise<{ path: string[] }> }) {
const { path } = await ctx.params;
const url = new URL(req.url);
const upstream = `${BACKEND_URL}/api/${path.join("/")}${url.search}`;
const body = await req.text();
return await safeUpstreamFetch(() =>
fetch(upstream, {
method: "POST",
headers: { ...forwardHeaders(req), "content-type": req.headers.get("content-type") || "application/json" },
body
})
);
}
function forwardHeaders(req: NextRequest) {
const h = new Headers();
const auth = req.headers.get("authorization");
if (auth) h.set("authorization", auth);
return h;
}
async function forwardResponse(r: Response) {
const headers = new Headers(r.headers);
headers.delete("access-control-allow-origin");
return new Response(await r.arrayBuffer(), { status: r.status, headers });
}
async function safeUpstreamFetch(doFetch: () => Promise<Response>) {
const maxAttempts = 3;
for (let i = 0; i < maxAttempts; i++) {
try {
const r = await doFetch();
return await forwardResponse(r);
} catch (e: any) {
const code = e?.cause?.code || e?.code;
const retryable = code === "ECONNREFUSED" || code === "ENOTFOUND" || code === "EAI_AGAIN";
if (!retryable || i === maxAttempts - 1) {
return new Response(
JSON.stringify({
detail: "上游后端不可达backend 容器可能正在重启或未就绪)。",
error: String(code || e?.message || e)
}),
{ status: 503, headers: { "content-type": "application/json; charset=utf-8" } }
);
}
await new Promise((r) => setTimeout(r, 200 * (i + 1)));
}
}
return new Response(JSON.stringify({ detail: "unknown" }), { status: 503 });
}

15
frontend/app/layout.tsx Normal file
View File

@@ -0,0 +1,15 @@
export const metadata = {
title: "Crawl BI Dashboard",
description: "Sales BI + Trend Engine + AI insight"
};
export default function RootLayout({ children }: { children: React.ReactNode }) {
return (
<html lang="zh-CN">
<body style={{ margin: 0, fontFamily: "ui-sans-serif, system-ui, -apple-system" }}>
{children}
</body>
</html>
);
}

6
frontend/app/page.tsx Normal file
View File

@@ -0,0 +1,6 @@
import Dashboard from "../components/Dashboard";
export default function Page() {
return <Dashboard />;
}

View File

@@ -0,0 +1,228 @@
"use client";
import React from "react";
import ReactECharts from "echarts-for-react";
type OverviewResponse = {
schema: any;
metrics: Record<string, any>;
};
type Winner = {
product_id: string;
title?: string | null;
category?: string | null;
units?: number;
gmv?: number;
potential_score?: number;
burst_score?: number;
follow_score?: number;
lifecycle?: string;
};
async function getJSON<T>(path: string): Promise<T> {
const r = await fetch(path, { cache: "no-store" });
if (!r.ok) throw new Error(await r.text());
return (await r.json()) as T;
}
async function postJSON<T>(path: string, body: any): Promise<T> {
const r = await fetch(path, {
method: "POST",
headers: { "content-type": "application/json" },
body: JSON.stringify(body)
});
if (!r.ok) throw new Error(await r.text());
return (await r.json()) as T;
}
export default function Dashboard() {
const [overview, setOverview] = React.useState<OverviewResponse | null>(null);
const [winners, setWinners] = React.useState<Winner[]>([]);
const [productId, setProductId] = React.useState("");
const [series, setSeries] = React.useState<{ ds: string; units: number; gmv: number }[]>([]);
const [forecast, setForecast] = React.useState<{ ds: string; units_hat: number }[]>([]);
const [aiQuery, setAiQuery] = React.useState("给我当前最值得跟卖的3个品类/商品,并说明理由与风险。");
const [aiAnswer, setAiAnswer] = React.useState<string>("");
const [err, setErr] = React.useState<string>("");
React.useEffect(() => {
(async () => {
try {
const o = await getJSON<OverviewResponse>("/api/metrics/overview");
setOverview(o);
const w = await getJSON<{ items: Winner[] }>("/api/trend/potential-winners?days=14&limit=30");
setWinners(w.items);
} catch (e: any) {
setErr(String(e?.message || e));
}
})();
}, []);
async function loadProduct(p: string) {
setErr("");
if (!p.trim()) {
setErr("请输入 product_id不能为空");
return;
}
try {
const ts = await getJSON<{ product_id: string; points: any[] }>(
`/api/metrics/sales/timeseries?product_id=${encodeURIComponent(p)}&days=60`
);
setSeries(ts.points as any);
const fc = await getJSON<{ forecast: any[] }>(
`/api/trend/forecast?product_id=${encodeURIComponent(p)}&days=60&horizon=14`
);
setForecast(fc.forecast as any);
} catch (e: any) {
setErr(String(e?.message || e));
}
}
async function runAI() {
setErr("");
try {
const r = await postJSON<{ answer: string }>("/api/ai/insight", {
query: aiQuery,
product_id: productId || undefined,
top_k: 6
});
setAiAnswer(r.answer || "");
} catch (e: any) {
setErr(String(e?.message || e));
}
}
const chartOption = {
tooltip: { trigger: "axis" },
legend: { data: ["units", "gmv", "forecast_units"] },
xAxis: { type: "category", data: series.map((p) => p.ds.slice(0, 10)) },
yAxis: [{ type: "value" }, { type: "value" }],
series: [
{ name: "units", type: "line", smooth: true, data: series.map((p) => p.units) },
{ name: "gmv", type: "line", smooth: true, yAxisIndex: 1, data: series.map((p) => p.gmv) },
{
name: "forecast_units",
type: "line",
smooth: true,
lineStyle: { type: "dashed" },
data: new Array(Math.max(0, series.length - 1)).fill(null).concat(forecast.map((f) => f.units_hat))
}
]
};
return (
<div style={{ padding: 18, maxWidth: 1200, margin: "0 auto" }}>
<div style={{ display: "flex", justifyContent: "space-between", alignItems: "baseline" }}>
<h2 style={{ margin: 0 }}> BI </h2>
<div />
</div>
{err ? (
<pre style={{ background: "#fff2f0", border: "1px solid #ffccc7", padding: 12, whiteSpace: "pre-wrap" }}>
{err}
</pre>
) : null}
<div style={{ display: "grid", gridTemplateColumns: "repeat(4, 1fr)", gap: 12, marginTop: 16 }}>
<Card title="产品数">{overview?.metrics?.products ?? "-"}</Card>
<Card title="近30天销量">{overview?.metrics?.units_30d ?? "-"}</Card>
<Card title="近30天GMV">{overview?.metrics?.gmv_30d ?? "-"}</Card>
<Card title="近30天记录数">{overview?.metrics?.rows_30d ?? "-"}</Card>
</div>
<div style={{ display: "grid", gridTemplateColumns: "1.3fr 1fr", gap: 12, marginTop: 16 }}>
<div style={{ border: "1px solid #eee", borderRadius: 10, padding: 12 }}>
<div style={{ display: "flex", gap: 8, alignItems: "center" }}>
<strong>线 + </strong>
<input
value={productId}
onChange={(e) => setProductId(e.target.value)}
placeholder="输入 product_id"
style={{ flex: 1, padding: "8px 10px", borderRadius: 8, border: "1px solid #ddd" }}
/>
<button onClick={() => loadProduct(productId)} style={btn}>
</button>
</div>
<div style={{ marginTop: 12 }}>
<ReactECharts option={chartOption} style={{ height: 320 }} />
</div>
</div>
<div style={{ border: "1px solid #eee", borderRadius: 10, padding: 12 }}>
<strong>Potential Winners</strong>
<div style={{ marginTop: 10, maxHeight: 360, overflow: "auto" }}>
{winners.map((w) => (
<div
key={w.product_id}
style={{
padding: "10px 10px",
border: "1px solid #f0f0f0",
borderRadius: 10,
marginBottom: 10,
cursor: "pointer"
}}
onClick={() => {
setProductId(w.product_id);
loadProduct(w.product_id);
}}
>
<div style={{ display: "flex", justifyContent: "space-between", gap: 10 }}>
<div style={{ fontWeight: 600 }}>{w.title || w.product_id}</div>
<div style={{ fontSize: 12, opacity: 0.8 }}>{w.lifecycle}</div>
</div>
<div style={{ fontSize: 12, opacity: 0.85, marginTop: 6 }}>
units={fmt(w.units)} gmv={fmt(w.gmv)} · potential={fmt(w.potential_score)} · burst={fmt(w.burst_score)} ·
follow={fmt(w.follow_score)}
</div>
</div>
))}
</div>
</div>
</div>
<div style={{ border: "1px solid #eee", borderRadius: 10, padding: 12, marginTop: 16 }}>
<div style={{ display: "flex", gap: 8, alignItems: "center" }}>
<strong>AI </strong>
<button onClick={runAI} style={btn}>
</button>
</div>
<textarea
value={aiQuery}
onChange={(e) => setAiQuery(e.target.value)}
rows={3}
style={{ width: "100%", marginTop: 10, padding: 10, borderRadius: 10, border: "1px solid #ddd" }}
/>
<pre style={{ marginTop: 10, background: "#0b1220", color: "#e6edf3", padding: 12, borderRadius: 10, whiteSpace: "pre-wrap" }}>
{aiAnswer || "(未生成)"}
</pre>
</div>
</div>
);
}
function Card({ title, children }: { title: string; children: React.ReactNode }) {
return (
<div style={{ border: "1px solid #eee", borderRadius: 10, padding: 12 }}>
<div style={{ fontSize: 12, opacity: 0.75 }}>{title}</div>
<div style={{ fontSize: 20, fontWeight: 700, marginTop: 6 }}>{children}</div>
</div>
);
}
const btn: React.CSSProperties = {
padding: "8px 12px",
borderRadius: 10,
border: "1px solid #ddd",
background: "#fff",
cursor: "pointer"
};
function fmt(v: any) {
if (v === null || v === undefined) return "-";
if (typeof v === "number") return Number.isFinite(v) ? v.toFixed(3).replace(/\.?0+$/, "") : "-";
return String(v);
}

3
frontend/next-env.d.ts vendored Normal file
View File

@@ -0,0 +1,3 @@
/// <reference types="next" />
/// <reference types="next/image-types/global" />

8
frontend/next.config.js Normal file
View File

@@ -0,0 +1,8 @@
/** @type {import('next').NextConfig} */
const nextConfig = {
reactStrictMode: true,
output: "standalone"
};
module.exports = nextConfig;

24
frontend/package.json Normal file
View File

@@ -0,0 +1,24 @@
{
"name": "crawl-demo-frontend",
"private": true,
"version": "0.1.0",
"scripts": {
"dev": "next dev -p 3000",
"build": "next build",
"start": "next start -p 3000",
"lint": "next lint"
},
"dependencies": {
"echarts": "^5.6.0",
"echarts-for-react": "^3.0.2",
"next": "^15.2.2",
"react": "^19.0.0",
"react-dom": "^19.0.0"
},
"devDependencies": {
"@types/node": "^22.13.10",
"@types/react": "^19.0.12",
"typescript": "^5.8.2"
}
}

1
frontend/public/.gitkeep Normal file
View File

@@ -0,0 +1 @@

21
frontend/tsconfig.json Normal file
View File

@@ -0,0 +1,21 @@
{
"compilerOptions": {
"target": "ES2022",
"lib": ["dom", "dom.iterable", "esnext"],
"allowJs": false,
"skipLibCheck": true,
"strict": true,
"noEmit": true,
"esModuleInterop": true,
"module": "esnext",
"moduleResolution": "bundler",
"resolveJsonModule": true,
"isolatedModules": true,
"jsx": "preserve",
"incremental": true,
"types": ["node"]
},
"include": ["next-env.d.ts", "**/*.ts", "**/*.tsx"],
"exclude": ["node_modules"]
}