commit d0ff049899108cf83adfb5479f594fdf0f99660d Author: Daniel Date: Wed Mar 18 18:57:58 2026 +0800 feat: new file diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..5f05ba9 --- /dev/null +++ b/.gitignore @@ -0,0 +1,16 @@ +**/.DS_Store +**/.env +**/.env.* +**/node_modules +**/.next +**/dist +**/__pycache__ +**/*.pyc +**/.pytest_cache +**/.venv +**/venv +**/data +**/*.log + +# local secrets / notes +sql.md diff --git a/README.md b/README.md new file mode 100644 index 0000000..3bdd567 --- /dev/null +++ b/README.md @@ -0,0 +1,70 @@ +# Crawl_demo — BI 看板 + 趋势引擎 + AI 分析 + +本项目从 MySQL(见 `sql.md`)抽取电商数据,生成 BI 数据看板,并实现: + +- 实时曲线:销量/GMV/排名等核心指标的近实时刷新 +- 趋势预测:潜伏期识别、爆发力评分、时间序列预测 +- 决策辅助:跟卖指数、生命周期预警 +- AI 分析:基于向量库检索 + LLM,总结“为什么它在涨、是否值得跟卖、风险点” + +## 目录 + +- `backend/`:FastAPI(指标 API、趋势引擎、AI 检索/分析、ETL 任务) +- `frontend/`:Next.js(BI 看板 UI) + +## 快速开始 + +1) 复制环境变量 + +```bash +cp backend/.env.example backend/.env +cp frontend/.env.example frontend/.env.local +``` + +### 一键 Docker 启动(推荐) + +在项目根目录执行: + +```bash +sh dev.sh +``` + +然后打开: + +- 前端:`http://localhost:3001` +- 后端:`http://localhost:8000/docs` + +启动脚本会自动输出: + +- 容器 `ps` 状态 +- 前后端最近日志(tail) +- HTTP 快速自检结果(backend/frontend/api-proxy) + +2) 启动后端与前端(本地开发) + +```bash +cd backend && python -m venv .venv && source .venv/bin/activate +pip install -r requirements.txt +uvicorn app.main:app --reload --port 8000 +``` + +```bash +cd frontend +npm install +npm run dev +``` + +3) 打开页面 + +- 前端:`http://localhost:3000` +- 后端:`http://localhost:8000/docs` + +## AI / 向量库 + +本项目默认支持 Milvus(向量数据库)。你也可以替换为 Azure SQL / Azure AI Search 等“MS 系”向量能力: + +- 先把 embedding 与 upsert 部分抽象在 `backend/app/vector/` 下 +- 替换实现即可 + +> 如果未配置 `OPENAI_API_KEY`,AI 分析接口会降级为规则引擎的“可解释摘要”,不调用 LLM。 + diff --git a/backend/Dockerfile b/backend/Dockerfile new file mode 100644 index 0000000..d731768 --- /dev/null +++ b/backend/Dockerfile @@ -0,0 +1,18 @@ +FROM python:3.12-slim + +WORKDIR /app + +ENV PYTHONDONTWRITEBYTECODE=1 +ENV PYTHONUNBUFFERED=1 + +RUN pip install --no-cache-dir --upgrade pip + +COPY requirements.txt /app/requirements.txt +RUN pip install --no-cache-dir -r /app/requirements.txt + +COPY app /app/app + +EXPOSE 8000 + +CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] + diff --git a/backend/app/__init__.py b/backend/app/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/backend/app/__init__.py @@ -0,0 +1 @@ + diff --git a/backend/app/db.py b/backend/app/db.py new file mode 100644 index 0000000..d6c557d --- /dev/null +++ b/backend/app/db.py @@ -0,0 +1,15 @@ +from __future__ import annotations + +from sqlalchemy import create_engine +from sqlalchemy.engine import Engine + +from .settings import settings + + +def get_engine() -> Engine: + return create_engine( + settings.mysql_url, + pool_pre_ping=True, + pool_recycle=3600, + ) + diff --git a/backend/app/main.py b/backend/app/main.py new file mode 100644 index 0000000..1e78fb5 --- /dev/null +++ b/backend/app/main.py @@ -0,0 +1,43 @@ +from __future__ import annotations + +from fastapi import FastAPI, Request +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import JSONResponse +from sqlalchemy.exc import OperationalError + +from .settings import settings +from .routes import ai, debug, metrics, trend +from .services.db_sample import print_db_sample_to_logs + + +app = FastAPI(title="Crawl BI Backend", version="0.1.0") + +@app.exception_handler(OperationalError) +async def db_operational_error_handler(_: Request, exc: OperationalError): + return JSONResponse( + status_code=503, + content={ + "detail": "数据库连接失败(请检查 MYSQL_HOST/USER/PASSWORD 以及 MySQL 授权 host/IP 白名单)。", + "error": str(exc.orig) if getattr(exc, "orig", None) else str(exc), + }, + ) + +app.add_middleware( + CORSMiddleware, + allow_origins=settings.cors_origins_list, + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +app.include_router(metrics.router, prefix="/api/metrics", tags=["metrics"]) +app.include_router(trend.router, prefix="/api/trend", tags=["trend"]) +app.include_router(ai.router, prefix="/api/ai", tags=["ai"]) +app.include_router(debug.router, prefix="/api/debug", tags=["debug"]) + + +@app.on_event("startup") +def _startup_print_sample() -> None: + if settings.debug_print_db_sample: + print_db_sample_to_logs() + diff --git a/backend/app/routes/__init__.py b/backend/app/routes/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/backend/app/routes/__init__.py @@ -0,0 +1 @@ + diff --git a/backend/app/routes/ai.py b/backend/app/routes/ai.py new file mode 100644 index 0000000..e18c980 --- /dev/null +++ b/backend/app/routes/ai.py @@ -0,0 +1,24 @@ +from __future__ import annotations + +from fastapi import APIRouter +from pydantic import BaseModel, Field + +from ..services.ai_insight import generate_insight + + +router = APIRouter() + + +class InsightRequest(BaseModel): + query: str = Field(..., min_length=1, max_length=2000) + product_id: str | None = None + top_k: int = Field(6, ge=1, le=20) + + +@router.post("/insight") +def insight(req: InsightRequest): + """ + 基于向量检索 + LLM(可选)输出“爆款发现 -> 数据验证 -> 决策跟卖”的建议。 + """ + return generate_insight(query=req.query, product_id=req.product_id, top_k=req.top_k) + diff --git a/backend/app/routes/debug.py b/backend/app/routes/debug.py new file mode 100644 index 0000000..92996a1 --- /dev/null +++ b/backend/app/routes/debug.py @@ -0,0 +1,18 @@ +from __future__ import annotations + +from fastapi import APIRouter, Query + +from ..services.db_sample import print_db_sample_to_logs + + +router = APIRouter() + + +@router.post("/db-sample") +def db_sample(limit: int = Query(10, ge=1, le=200)): + """ + 触发一次“数据库样例打印到后端日志”,用于快速理解数据内容与结构。 + """ + print_db_sample_to_logs(limit=limit) + return {"ok": True, "printed": True, "limit": limit} + diff --git a/backend/app/routes/metrics.py b/backend/app/routes/metrics.py new file mode 100644 index 0000000..5052576 --- /dev/null +++ b/backend/app/routes/metrics.py @@ -0,0 +1,56 @@ +from __future__ import annotations + +from datetime import datetime, timedelta + +import pandas as pd +from fastapi import APIRouter, HTTPException, Query +from sqlalchemy import text + +from ..db import get_engine +from ..services.schema_discovery import discover_schema +from ..services.timeseries import normalize_timeseries + + +router = APIRouter() + + +@router.get("/overview") +def overview(): + """ + 返回 BI 顶部卡片核心指标(尽量从现有表自动推断)。 + """ + engine = get_engine() + schema = discover_schema(engine) + if not schema.sales_table: + raise HTTPException(status_code=422, detail="未发现可用销量/订单明细表(需要至少包含 product_id + 时间 + 数量/金额)") + + q = text(schema.overview_sql) + with engine.connect() as conn: + row = conn.execute(q).mappings().first() + return {"schema": schema.model_dump(), "metrics": dict(row) if row else {}} + + +@router.get("/sales/timeseries") +def sales_timeseries( + product_id: str = Query(..., min_length=1), + days: int = Query(30, ge=1, le=365), +): + engine = get_engine() + schema = discover_schema(engine) + if not schema.sales_table: + raise HTTPException(status_code=422, detail="未发现可用销量/订单明细表") + + since = datetime.utcnow() - timedelta(days=days) + q = text(schema.timeseries_sql) + with engine.connect() as conn: + df = pd.read_sql( + q, + conn, + params={"product_id": product_id, "since": since}, + ) + if df.empty: + return {"product_id": product_id, "points": []} + + points = normalize_timeseries(df, ts_col="ds", value_cols=["units", "gmv"]) + return {"product_id": product_id, "points": points} + diff --git a/backend/app/routes/trend.py b/backend/app/routes/trend.py new file mode 100644 index 0000000..c85706f --- /dev/null +++ b/backend/app/routes/trend.py @@ -0,0 +1,64 @@ +from __future__ import annotations + +import math +from datetime import datetime, timedelta + +import pandas as pd +from fastapi import APIRouter, HTTPException, Query +from sqlalchemy import text + +from ..db import get_engine +from ..services.forecast import forecast_next_n +from ..services.schema_discovery import discover_schema +from ..services.trend_engine import compute_trend_scores + + +router = APIRouter() + + +@router.get("/potential-winners") +def potential_winners(days: int = Query(14, ge=3, le=60), limit: int = Query(50, ge=1, le=200)): + engine = get_engine() + schema = discover_schema(engine) + if not schema.sales_table: + raise HTTPException(status_code=422, detail="未发现可用销量/订单明细表") + + since = datetime.utcnow() - timedelta(days=days) + q = text(schema.trend_candidates_sql) + with engine.connect() as conn: + df = pd.read_sql(q, conn, params={"since": since, "limit": limit * 5}) + if df.empty: + return {"items": []} + + scored = compute_trend_scores(df) + scored = scored.sort_values("potential_score", ascending=False).head(limit) + return {"items": scored.to_dict(orient="records")} + + +@router.get("/forecast") +def forecast( + product_id: str = Query(..., min_length=1), + days: int = Query(30, ge=7, le=180), + horizon: int = Query(14, ge=1, le=60), +): + engine = get_engine() + schema = discover_schema(engine) + if not schema.sales_table: + raise HTTPException(status_code=422, detail="未发现可用销量/订单明细表") + + since = datetime.utcnow() - timedelta(days=days) + q = text(schema.timeseries_sql) + with engine.connect() as conn: + df = pd.read_sql(q, conn, params={"product_id": product_id, "since": since}) + if df.empty: + return {"product_id": product_id, "forecast": []} + + df = df.sort_values("ds") + y = df["units"].astype(float).fillna(0.0).values + yhat = forecast_next_n(y, n=horizon) + start = pd.to_datetime(df["ds"]).max() + out = [] + for i, v in enumerate(yhat, start=1): + out.append({"ds": (start + pd.Timedelta(days=i)).to_pydatetime().isoformat(), "units_hat": float(max(0.0, v))}) + return {"product_id": product_id, "forecast": out} + diff --git a/backend/app/services/ai_insight.py b/backend/app/services/ai_insight.py new file mode 100644 index 0000000..a60574e --- /dev/null +++ b/backend/app/services/ai_insight.py @@ -0,0 +1,73 @@ +from __future__ import annotations + +from typing import Any + +import httpx + +from ..settings import settings + + +def _rule_based_summary(query: str, retrieved: list[dict[str, Any]]) -> dict[str, Any]: + top = retrieved[:3] + bullets = [] + for r in top: + pid = r.get("product_id") or r.get("id") or "-" + title = r.get("title") or "" + follow = r.get("follow_score") + life = r.get("lifecycle") + bullets.append(f"- {pid} {title}(跟卖指数={follow} 生命周期={life})") + return { + "mode": "rules_only", + "query": query, + "retrieved": retrieved, + "answer": "基于当前向量库/指标的规则摘要:\n" + "\n".join(bullets), + } + + +def generate_insight(query: str, product_id: str | None, top_k: int) -> dict[str, Any]: + """ + 这里先做“可运行的最小闭环”: + - 向量检索(尚未实现时返回空) + - 有 OPENAI_API_KEY 则调用 LLM,输出结构化建议 + - 否则返回规则引擎摘要 + """ + # TODO: 接入向量库检索(Milvus/Azure 等)。先保留协议,保证前端可用。 + retrieved: list[dict[str, Any]] = [] + + if not settings.openai_api_key: + return _rule_based_summary(query, retrieved) + + prompt = f"""你是电商数据分析与选品决策助手。 +用户问题:{query} + +请输出一个“发现爆款 -> 数据验证 -> 决策跟卖”的闭环建议,包含: +1) 结论摘要(3-5条) +2) 数据证据(引用关键指标:销量/增速/竞争/生命周期) +3) 风险点与反例(至少3条) +4) 可执行动作(选品、备货、投流、供应链) +如果没有足够数据,请明确说明缺口,并给出最小补充数据清单。 +""" + + headers = {"Authorization": f"Bearer {settings.openai_api_key}"} + payload = { + "model": settings.openai_model, + "input": prompt, + } + + try: + with httpx.Client(timeout=30.0) as client: + r = client.post("https://api.openai.com/v1/responses", headers=headers, json=payload) + r.raise_for_status() + data = r.json() + text = "" + for item in data.get("output", []): + for c in item.get("content", []): + if c.get("type") in ("output_text", "text"): + text += c.get("text", "") + return {"mode": "llm", "query": query, "retrieved": retrieved, "answer": text.strip()} + except Exception as e: + out = _rule_based_summary(query, retrieved) + out["mode"] = "rules_fallback" + out["error"] = str(e) + return out + diff --git a/backend/app/services/db_sample.py b/backend/app/services/db_sample.py new file mode 100644 index 0000000..6f58599 --- /dev/null +++ b/backend/app/services/db_sample.py @@ -0,0 +1,105 @@ +from __future__ import annotations + +import json +import logging +from typing import Any + +import pandas as pd +from sqlalchemy import Engine, text + +from ..db import get_engine +from ..services.schema_discovery import discover_schema +from ..settings import settings + +log = logging.getLogger("db_sample") + + +def _truncate_value(v: Any, max_len: int) -> Any: + if v is None: + return None + if isinstance(v, (int, float, bool)): + return v + s = str(v) + if len(s) <= max_len: + return s + return s[: max_len - 3] + "..." + + +def _df_to_records(df: pd.DataFrame, max_str_len: int) -> list[dict[str, Any]]: + out: list[dict[str, Any]] = [] + for _, row in df.iterrows(): + rec: dict[str, Any] = {} + for k, v in row.items(): + rec[str(k)] = _truncate_value(v, max_str_len) + out.append(rec) + return out + + +def _list_tables(engine: Engine) -> list[str]: + with engine.connect() as conn: + rows = conn.execute( + text( + """ + SELECT table_name + FROM information_schema.tables + WHERE table_schema = DATABASE() + ORDER BY table_name + """ + ) + ).all() + return [r[0] for r in rows] + + +def _table_row_count(engine: Engine, table: str) -> int | None: + try: + with engine.connect() as conn: + v = conn.execute(text(f"SELECT COUNT(*) FROM {table}")).scalar() + return int(v) if v is not None else None + except Exception: + return None + + +def _sample_table(engine: Engine, table: str, limit: int) -> pd.DataFrame: + with engine.connect() as conn: + return pd.read_sql(text(f"SELECT * FROM {table} LIMIT {limit}"), conn) + + +def print_db_sample_to_logs(limit: int | None = None) -> None: + """ + 打印数据库结构与样例行到后端日志,便于分析。 + 注意:会截断长字符串,避免日志爆炸。 + """ + engine = get_engine() + schema = discover_schema(engine) + + eff_limit = int(limit or settings.debug_db_sample_limit) + max_str_len = int(settings.debug_db_sample_max_str_len) + + tables = _list_tables(engine) + log.warning("DB SAMPLE: discovered schema=%s", schema.model_dump()) + log.warning("DB SAMPLE: tables=%s", tables) + + # prioritize discovered tables first + prioritized: list[str] = [] + for t in [schema.sales_table, schema.products_table]: + if t and t in tables and t not in prioritized: + prioritized.append(t) + for t in tables: + if t not in prioritized: + prioritized.append(t) + + for t in prioritized[: min(len(prioritized), 8)]: + cnt = _table_row_count(engine, t) + try: + df = _sample_table(engine, t, eff_limit) + recs = _df_to_records(df, max_str_len=max_str_len) + log.warning( + "DB SAMPLE: table=%s rows=%s cols=%s sample=%s", + t, + cnt, + list(df.columns), + json.dumps(recs, ensure_ascii=False), + ) + except Exception as e: + log.warning("DB SAMPLE: table=%s rows=%s sample_failed=%s", t, cnt, str(e)) + diff --git a/backend/app/services/forecast.py b/backend/app/services/forecast.py new file mode 100644 index 0000000..eba80d2 --- /dev/null +++ b/backend/app/services/forecast.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +import numpy as np +from statsmodels.tsa.holtwinters import ExponentialSmoothing + + +def forecast_next_n(y: np.ndarray, n: int) -> np.ndarray: + """ + 轻量级预测:优先 Holt-Winters(对短序列也相对稳),失败则用简单移动平均。 + """ + y = np.asarray(y, dtype=float) + if y.size < 3: + return np.repeat(y[-1] if y.size else 0.0, n) + + try: + model = ExponentialSmoothing( + y, + trend="add", + seasonal=None, + initialization_method="estimated", + ) + fit = model.fit(optimized=True) + return np.asarray(fit.forecast(n), dtype=float) + except Exception: + window = int(min(7, max(3, y.size // 2))) + avg = float(np.mean(y[-window:])) if y.size else 0.0 + return np.repeat(avg, n) + diff --git a/backend/app/services/schema_discovery.py b/backend/app/services/schema_discovery.py new file mode 100644 index 0000000..8f679aa --- /dev/null +++ b/backend/app/services/schema_discovery.py @@ -0,0 +1,223 @@ +from __future__ import annotations + +from dataclasses import dataclass + +from pydantic import BaseModel +from sqlalchemy import Engine, text + + +class DiscoveredSchema(BaseModel): + # discovered table names + sales_table: str | None = None + products_table: str | None = None + + # required columns (in sales_table) + sales_product_id_col: str | None = None + sales_time_col: str | None = None + sales_units_col: str | None = None + sales_amount_col: str | None = None + + # optional product cols + product_title_col: str | None = None + product_created_col: str | None = None + product_rank_col: str | None = None + product_category_col: str | None = None + product_desc_col: str | None = None + + @property + def overview_sql(self) -> str: + # minimal, safe aggregations + t = self.sales_table + pid = self.sales_product_id_col + ts = self.sales_time_col + units = self.sales_units_col + amount = self.sales_amount_col + return f""" + SELECT + COUNT(DISTINCT {pid}) AS products, + SUM(COALESCE({units}, 0)) AS units_30d, + SUM(COALESCE({amount}, 0)) AS gmv_30d, + COUNT(*) AS rows_30d + FROM {t} + WHERE {ts} >= (UTC_TIMESTAMP() - INTERVAL 30 DAY) + """ + + @property + def timeseries_sql(self) -> str: + t = self.sales_table + pid = self.sales_product_id_col + ts = self.sales_time_col + units = self.sales_units_col + amount = self.sales_amount_col + return f""" + SELECT + DATE({ts}) AS ds, + SUM(COALESCE({units}, 0)) AS units, + SUM(COALESCE({amount}, 0)) AS gmv + FROM {t} + WHERE {pid} = :product_id + AND {ts} >= :since + GROUP BY DATE({ts}) + ORDER BY ds ASC + """ + + @property + def trend_candidates_sql(self) -> str: + # produce per-product last-N-day rollups; join products when available + t = self.sales_table + pid = self.sales_product_id_col + ts = self.sales_time_col + units = self.sales_units_col + amount = self.sales_amount_col + + p = self.products_table + title = self.product_title_col + created = self.product_created_col + rank = self.product_rank_col + cat = self.product_category_col + + join = "" + if p: + join = f"LEFT JOIN {p} p ON p.{pid} = s.{pid}" if self._products_has_same_pid_name else f"" + + # if we can't confidently join, still return sales-only metrics + select_p = "" + if p and join: + title_expr = f"p.{title}" if title else "NULL" + cat_expr = f"p.{cat}" if cat else "NULL" + created_expr = f"p.{created}" if created else "NULL" + rank_expr = f"p.{rank}" if rank else "NULL" + select_p = f""", + {title_expr} AS title, + {cat_expr} AS category, + {created_expr} AS created_at, + {rank_expr} AS rank_now + """ + return f""" + SELECT + s.{pid} AS product_id, + SUM(COALESCE(s.{units}, 0)) AS units, + SUM(COALESCE(s.{amount}, 0)) AS gmv, + COUNT(*) AS records, + MIN(s.{ts}) AS first_seen, + MAX(s.{ts}) AS last_seen + {select_p} + FROM {t} s + {join} + WHERE s.{ts} >= :since + GROUP BY s.{pid} + ORDER BY units DESC + LIMIT :limit + """ + + @property + def _products_has_same_pid_name(self) -> bool: + # discovery sets this attribute dynamically + return getattr(self, "__products_has_same_pid_name", False) + + def set_products_pid_same(self, v: bool) -> None: + setattr(self, "__products_has_same_pid_name", v) + + +SALES_UNITS_CANDIDATES = ["units", "qty", "quantity", "sales", "sold", "order_qty", "num"] +SALES_AMOUNT_CANDIDATES = ["amount", "gmv", "revenue", "pay_amount", "total", "price", "order_amount"] +TIME_CANDIDATES = ["created_at", "create_time", "created", "ts", "timestamp", "date_time", "paid_at", "order_time"] +PID_CANDIDATES = ["product_id", "item_id", "sku_id", "goods_id", "asin"] +PRODUCT_TITLE_CANDIDATES = ["title", "name", "product_name", "item_title"] +PRODUCT_DESC_CANDIDATES = ["description", "desc", "detail"] +PRODUCT_CREATED_CANDIDATES = ["created_at", "create_time", "created"] +PRODUCT_RANK_CANDIDATES = ["rank", "bsr_rank", "position"] +PRODUCT_CATEGORY_CANDIDATES = ["category", "cat", "category_name"] + + +def _lower(s: str | None) -> str: + return (s or "").lower() + + +def _pick(cols: list[str], candidates: list[str]) -> str | None: + cols_l = {_lower(c): c for c in cols} + for cand in candidates: + if cand in cols_l: + return cols_l[cand] + return None + + +def discover_schema(engine: Engine) -> DiscoveredSchema: + """ + 在未知表结构的情况下做“足够稳妥”的自动发现: + - 优先寻找包含 product_id + 时间 + 数量/金额 的表作为 sales_table + - 寻找包含 title/name 等列的表作为 products_table + """ + with engine.connect() as conn: + rows = conn.execute( + text( + """ + SELECT table_name, column_name + FROM information_schema.columns + WHERE table_schema = DATABASE() + ORDER BY table_name, ordinal_position + """ + ) + ).all() + + by_table: dict[str, list[str]] = {} + for t, c in rows: + by_table.setdefault(t, []).append(c) + + best_sales: tuple[int, str, dict[str, str]] | None = None + best_products: tuple[int, str, dict[str, str]] | None = None + + for t, cols in by_table.items(): + pid = _pick(cols, PID_CANDIDATES) + ts = _pick(cols, TIME_CANDIDATES) + units = _pick(cols, SALES_UNITS_CANDIDATES) + amount = _pick(cols, SALES_AMOUNT_CANDIDATES) + + score = 0 + if pid: + score += 3 + if ts: + score += 3 + if units: + score += 2 + if amount: + score += 1 + + if score >= 6: + if best_sales is None or score > best_sales[0]: + best_sales = (score, t, {"pid": pid, "ts": ts, "units": units, "amount": amount}) + + title = _pick(cols, PRODUCT_TITLE_CANDIDATES) + if title: + pscore = 2 + if _pick(cols, PID_CANDIDATES): + pscore += 2 + if _pick(cols, PRODUCT_CATEGORY_CANDIDATES): + pscore += 1 + if _pick(cols, PRODUCT_DESC_CANDIDATES): + pscore += 1 + if best_products is None or pscore > best_products[0]: + best_products = (pscore, t, {"title": title}) + + schema = DiscoveredSchema() + if best_sales: + _, t, m = best_sales + schema.sales_table = t + schema.sales_product_id_col = m["pid"] + schema.sales_time_col = m["ts"] + schema.sales_units_col = m["units"] or m["amount"] # last resort + schema.sales_amount_col = m["amount"] or m["units"] + + if best_products: + _, pt, _ = best_products + schema.products_table = pt + cols = by_table.get(pt, []) + schema.product_title_col = _pick(cols, PRODUCT_TITLE_CANDIDATES) + schema.product_desc_col = _pick(cols, PRODUCT_DESC_CANDIDATES) + schema.product_created_col = _pick(cols, PRODUCT_CREATED_CANDIDATES) + schema.product_rank_col = _pick(cols, PRODUCT_RANK_CANDIDATES) + schema.product_category_col = _pick(cols, PRODUCT_CATEGORY_CANDIDATES) + schema.set_products_pid_same(_pick(cols, PID_CANDIDATES) == schema.sales_product_id_col) + + return schema + diff --git a/backend/app/services/timeseries.py b/backend/app/services/timeseries.py new file mode 100644 index 0000000..1a2d6d7 --- /dev/null +++ b/backend/app/services/timeseries.py @@ -0,0 +1,21 @@ +from __future__ import annotations + +import pandas as pd + + +def normalize_timeseries(df: pd.DataFrame, ts_col: str, value_cols: list[str]) -> list[dict]: + df = df.copy() + df[ts_col] = pd.to_datetime(df[ts_col], errors="coerce") + df = df.dropna(subset=[ts_col]).sort_values(ts_col) + out = [] + for _, r in df.iterrows(): + p = {"ds": r[ts_col].to_pydatetime().isoformat()} + for c in value_cols: + v = r.get(c) + try: + p[c] = float(v) if v is not None else 0.0 + except Exception: + p[c] = 0.0 + out.append(p) + return out + diff --git a/backend/app/services/trend_engine.py b/backend/app/services/trend_engine.py new file mode 100644 index 0000000..1c6a7d9 --- /dev/null +++ b/backend/app/services/trend_engine.py @@ -0,0 +1,108 @@ +from __future__ import annotations + +import math + +import numpy as np +import pandas as pd + + +def _sigmoid(x: float) -> float: + return 1.0 / (1.0 + math.exp(-x)) + + +def compute_trend_scores(df: pd.DataFrame) -> pd.DataFrame: + """ + 在“表结构不确定”的情况下,先基于可用字段做可解释评分: + - 潜伏期识别(Potential Winners):新上架/刚出现 + 指标增长快 + - 爆发力:若存在 tiktok/search 等字段则融合,否则使用销量/GMV加速度代理 + - 决策建议:跟卖指数 + 生命周期预警(缺失字段会自动降权) + """ + out = df.copy() + now = pd.Timestamp.now(tz="UTC") + + # 新品/潜伏期:first_seen 越近、同时 units/gmv 相对高 -> potential + first_seen = pd.to_datetime(out.get("first_seen"), errors="coerce", utc=True) + age_days = (now - first_seen).dt.total_seconds() / 86400.0 + age_days = age_days.fillna(999.0).clip(lower=0.0) + + units = pd.to_numeric(out.get("units"), errors="coerce").fillna(0.0) + gmv = pd.to_numeric(out.get("gmv"), errors="coerce").fillna(0.0) + + # 规模归一:log1p 降噪 + units_s = np.log1p(units) + gmv_s = np.log1p(gmv) + + freshness = 1.0 / (1.0 + (age_days / 7.0)) # 0~1 + scale = (units_s.rank(pct=True) * 0.6 + gmv_s.rank(pct=True) * 0.4).clip(0.0, 1.0) + out["potential_score"] = (freshness * 0.55 + scale * 0.45).clip(0.0, 1.0) + + # 爆发力:优先融合可选字段 + tiktok_raw = out["tiktok_hot"] if "tiktok_hot" in out.columns else pd.Series(np.nan, index=out.index) + search_raw = out["search_growth"] if "search_growth" in out.columns else pd.Series(np.nan, index=out.index) + tiktok = pd.to_numeric(tiktok_raw, errors="coerce") + search_g = pd.to_numeric(search_raw, errors="coerce") + if tiktok.notna().any() or search_g.notna().any(): + tiktok_s = tiktok.fillna(tiktok.median() if tiktok.notna().any() else 0.0) + search_s = search_g.fillna(search_g.median() if search_g.notna().any() else 0.0) + burst = ( + pd.Series(tiktok_s).rank(pct=True) * 0.6 + + pd.Series(search_s).rank(pct=True) * 0.4 + ).clip(0.0, 1.0) + else: + # 无外部热度字段:用规模 + 新鲜度 作为代理 + burst = (scale * 0.65 + freshness * 0.35).clip(0.0, 1.0) + out["burst_score"] = burst + + # 跟卖指数:竞争(records 越多)负向;利润空间/供应链难度若缺失则降级 + records = pd.to_numeric(out.get("records"), errors="coerce").fillna(0.0) + competition = records.rank(pct=True).clip(0.0, 1.0) # 越大越卷 + + margin_raw = out["margin"] if "margin" in out.columns else pd.Series(np.nan, index=out.index) + margin = pd.to_numeric(margin_raw, errors="coerce") + if margin.notna().any(): + margin_s = margin.fillna(margin.median()).rank(pct=True).clip(0.0, 1.0) + margin_w = 0.35 + else: + margin_s = pd.Series(0.5, index=out.index) + margin_w = 0.15 + + supply_raw = out["supply_difficulty"] if "supply_difficulty" in out.columns else pd.Series(np.nan, index=out.index) + supply = pd.to_numeric(supply_raw, errors="coerce") + if supply.notna().any(): + supply_s = (1.0 - supply.fillna(supply.median()).rank(pct=True)).clip(0.0, 1.0) # 越难越低分 + supply_w = 0.20 + else: + supply_s = pd.Series(0.5, index=out.index) + supply_w = 0.10 + + # 趋势作为正向 + trend = (out["potential_score"] * 0.5 + out["burst_score"] * 0.5).clip(0.0, 1.0) + trend_w = 0.45 + + comp_w = 0.20 + follow = ( + trend * trend_w + + margin_s * margin_w + + supply_s * supply_w + + (1.0 - competition) * comp_w + ) + out["follow_score"] = follow.clip(0.0, 1.0) + + # 生命周期预警(简化):过老 + 规模不增长 + 竞争高 => red-ocean / decline + lifecycle = [] + for i in out.index: + a = float(age_days.loc[i]) + comp = float(competition.loc[i]) + tr = float(trend.loc[i]) + if a > 120 and comp > 0.7 and tr < 0.4: + lifecycle.append("decline_or_red_ocean") + elif a > 60 and comp > 0.75: + lifecycle.append("red_ocean") + elif a < 21 and tr > 0.65: + lifecycle.append("early_growth") + else: + lifecycle.append("normal") + out["lifecycle"] = lifecycle + + return out + diff --git a/backend/app/settings.py b/backend/app/settings.py new file mode 100644 index 0000000..d12b616 --- /dev/null +++ b/backend/app/settings.py @@ -0,0 +1,42 @@ +from __future__ import annotations + +from pydantic_settings import BaseSettings, SettingsConfigDict + + +class Settings(BaseSettings): + model_config = SettingsConfigDict(env_file=".env", env_file_encoding="utf-8", extra="ignore") + + mysql_host: str + mysql_port: int = 3306 + mysql_user: str + mysql_password: str + mysql_database: str + + openai_api_key: str | None = None + openai_model: str = "gpt-4.1-mini" + openai_embed_model: str = "text-embedding-3-small" + + milvus_host: str = "localhost" + milvus_port: int = 19530 + milvus_collection: str = "products_v1" + + app_env: str = "dev" + app_cors_origins: str = "http://localhost:3000" + debug_print_db_sample: bool = False + debug_db_sample_limit: int = 10 + debug_db_sample_max_str_len: int = 120 + + @property + def mysql_url(self) -> str: + return ( + f"mysql+pymysql://{self.mysql_user}:{self.mysql_password}" + f"@{self.mysql_host}:{self.mysql_port}/{self.mysql_database}?charset=utf8mb4" + ) + + @property + def cors_origins_list(self) -> list[str]: + return [o.strip() for o in self.app_cors_origins.split(",") if o.strip()] + + +settings = Settings() + diff --git a/backend/requirements.txt b/backend/requirements.txt new file mode 100644 index 0000000..28d48c4 --- /dev/null +++ b/backend/requirements.txt @@ -0,0 +1,13 @@ +fastapi==0.115.11 +uvicorn[standard]==0.34.0 +pydantic==2.10.6 +pydantic-settings==2.8.1 +sqlalchemy==2.0.39 +pymysql==1.1.1 +pandas==2.2.3 +numpy==2.2.3 +scikit-learn==1.6.1 +statsmodels==0.14.4 +httpx==0.28.1 +tenacity==9.0.0 +python-dotenv==1.0.1 diff --git a/dev.sh b/dev.sh new file mode 100755 index 0000000..3e7a507 --- /dev/null +++ b/dev.sh @@ -0,0 +1,100 @@ +#!/usr/bin/env sh +set -eu + +ROOT_DIR="$(cd "$(dirname "$0")" && pwd)" +cd "$ROOT_DIR" + +MODE="${1:-up}" + +if ! command -v docker >/dev/null 2>&1; then + echo "Docker 未安装或不可用。请先安装 Docker Desktop。" >&2 + exit 1 +fi + +if ! docker info >/dev/null 2>&1; then + echo "Docker daemon 未启动。请先启动 Docker Desktop。" >&2 + exit 1 +fi + +if ! command -v docker >/dev/null 2>&1 || ! docker compose version >/dev/null 2>&1; then + echo "docker compose 不可用,请升级 Docker Desktop。" >&2 + exit 1 +fi + +if [ ! -f "backend/.env" ]; then + cp "backend/.env.example" "backend/.env" + echo "已生成 backend/.env(请把 MYSQL_PASSWORD / OPENAI_API_KEY 改成你的值)" +fi + +case "$MODE" in + up) + echo "启动中:构建并启动所有容器(随后将跟随日志,按 Ctrl+C 退出跟随)..." + docker compose up --build -d + ;; + -d|detached) + echo "启动中:构建并后台启动所有容器..." + docker compose up --build -d + ;; + down) + docker compose down + exit 0 + ;; + ps) + docker compose ps + exit 0 + ;; + logs) + docker compose logs --tail=200 backend frontend + exit 0 + ;; + follow|f) + docker compose logs -f --tail=200 backend frontend + exit 0 + ;; + *) + echo "用法:" + echo " sh dev.sh # 启动并跟随日志(默认)" + echo " sh dev.sh -d # 后台启动" + echo " sh dev.sh follow # 跟随日志" + echo " sh dev.sh logs # 打印最近日志" + echo " sh dev.sh ps # 查看容器状态" + echo " sh dev.sh down # 停止并清理" + exit 1 + ;; +esac + +echo "" +echo "容器状态:" +docker compose ps + +echo "" +echo "后端最近日志(backend,tail=120):" +docker compose logs --tail=120 backend || true + +echo "" +echo "前端最近日志(frontend,tail=120):" +docker compose logs --tail=120 frontend || true + +echo "" +echo "快速自检(HTTP):" +( + curl -fsS "http://localhost:8000/docs" >/dev/null 2>&1 && echo "- backend: OK (http://localhost:8000/docs)" || echo "- backend: FAIL (检查 backend 日志/数据库连通性)" + curl -fsS "http://localhost:3001" >/dev/null 2>&1 && echo "- frontend: OK (http://localhost:3001)" || echo "- frontend: FAIL (检查 frontend 日志)" + curl -fsS "http://localhost:3001/api/metrics/overview" >/dev/null 2>&1 && echo "- api-proxy: OK (/api/metrics/overview)" || echo "- api-proxy: FAIL (通常是 backend 500)" +) || true + +echo "" +echo "已启动:" +echo "- 前端: http://localhost:3001" +echo "- 后端 Swagger: http://localhost:8000/docs" +echo "- Milvus: localhost:19530" +echo "" +echo "跟随日志:sh dev.sh follow (或 docker compose logs -f --tail=200 backend frontend)" +echo "停止:docker compose down" + +if [ "$MODE" = "up" ]; then + echo "" + echo "开始跟随日志(Ctrl+C 可退出跟随,不会停止容器)..." + docker compose logs -f --tail=200 backend frontend +fi + diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..d5020f5 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,80 @@ +services: + backend: + build: + context: ./backend + dockerfile: Dockerfile + env_file: + - ./backend/.env + environment: + - APP_CORS_ORIGINS=http://localhost:3000,http://localhost:3001 + ports: + - "8000:8000" + depends_on: + - milvus + restart: unless-stopped + + frontend: + build: + context: ./frontend + dockerfile: Dockerfile + env_file: + - ./frontend/.env.example + environment: + - BACKEND_URL=http://backend:8000 + ports: + - "3001:3000" + depends_on: + - backend + restart: unless-stopped + + # Milvus standalone (vector DB) + etcd: + image: quay.io/coreos/etcd:v3.5.16 + environment: + - ETCD_AUTO_COMPACTION_MODE=revision + - ETCD_AUTO_COMPACTION_RETENTION=1000 + - ETCD_QUOTA_BACKEND_BYTES=4294967296 + - ETCD_SNAPSHOT_COUNT=50000 + command: > + etcd + -advertise-client-urls=http://0.0.0.0:2379 + -listen-client-urls=http://0.0.0.0:2379 + --data-dir /etcd + volumes: + - milvus_etcd:/etcd + restart: unless-stopped + + minio: + image: minio/minio:RELEASE.2025-01-20T14-49-07Z + environment: + - MINIO_ACCESS_KEY=minioadmin + - MINIO_SECRET_KEY=minioadmin + command: minio server /minio_data --console-address ":9001" + ports: + - "9000:9000" + - "9001:9001" + volumes: + - milvus_minio:/minio_data + restart: unless-stopped + + milvus: + image: milvusdb/milvus:v2.5.6 + command: ["milvus", "run", "standalone"] + environment: + - ETCD_ENDPOINTS=etcd:2379 + - MINIO_ADDRESS=minio:9000 + ports: + - "19530:19530" + - "9091:9091" + volumes: + - milvus_data:/var/lib/milvus + depends_on: + - etcd + - minio + restart: unless-stopped + +volumes: + milvus_etcd: + milvus_minio: + milvus_data: + diff --git a/frontend/Dockerfile b/frontend/Dockerfile new file mode 100644 index 0000000..a843bec --- /dev/null +++ b/frontend/Dockerfile @@ -0,0 +1,20 @@ +FROM node:20-slim AS deps +WORKDIR /app +COPY package.json /app/package.json +RUN npm install + +FROM node:20-slim AS builder +WORKDIR /app +COPY --from=deps /app/node_modules /app/node_modules +COPY . /app +RUN npm run build + +FROM node:20-slim AS runner +WORKDIR /app +ENV NODE_ENV=production +COPY --from=builder /app/.next/standalone /app +COPY --from=builder /app/.next/static /app/.next/static +COPY --from=builder /app/public /app/public +EXPOSE 3000 +CMD ["node", "server.js"] + diff --git a/frontend/app/api/[...path]/route.ts b/frontend/app/api/[...path]/route.ts new file mode 100644 index 0000000..1e058fd --- /dev/null +++ b/frontend/app/api/[...path]/route.ts @@ -0,0 +1,62 @@ +import { NextRequest } from "next/server"; + +const BACKEND_URL = process.env.BACKEND_URL || "http://localhost:8000"; + +export async function GET(req: NextRequest, ctx: { params: Promise<{ path: string[] }> }) { + const { path } = await ctx.params; + const url = new URL(req.url); + const upstream = `${BACKEND_URL}/api/${path.join("/")}${url.search}`; + return await safeUpstreamFetch(() => fetch(upstream, { headers: forwardHeaders(req) })); +} + +export async function POST(req: NextRequest, ctx: { params: Promise<{ path: string[] }> }) { + const { path } = await ctx.params; + const url = new URL(req.url); + const upstream = `${BACKEND_URL}/api/${path.join("/")}${url.search}`; + const body = await req.text(); + return await safeUpstreamFetch(() => + fetch(upstream, { + method: "POST", + headers: { ...forwardHeaders(req), "content-type": req.headers.get("content-type") || "application/json" }, + body + }) + ); +} + +function forwardHeaders(req: NextRequest) { + const h = new Headers(); + const auth = req.headers.get("authorization"); + if (auth) h.set("authorization", auth); + return h; +} + +async function forwardResponse(r: Response) { + const headers = new Headers(r.headers); + headers.delete("access-control-allow-origin"); + return new Response(await r.arrayBuffer(), { status: r.status, headers }); +} + +async function safeUpstreamFetch(doFetch: () => Promise) { + const maxAttempts = 3; + for (let i = 0; i < maxAttempts; i++) { + try { + const r = await doFetch(); + return await forwardResponse(r); + } catch (e: any) { + const code = e?.cause?.code || e?.code; + const retryable = code === "ECONNREFUSED" || code === "ENOTFOUND" || code === "EAI_AGAIN"; + if (!retryable || i === maxAttempts - 1) { + return new Response( + JSON.stringify({ + detail: "上游后端不可达(backend 容器可能正在重启或未就绪)。", + error: String(code || e?.message || e) + }), + { status: 503, headers: { "content-type": "application/json; charset=utf-8" } } + ); + } + await new Promise((r) => setTimeout(r, 200 * (i + 1))); + } + } + return new Response(JSON.stringify({ detail: "unknown" }), { status: 503 }); +} + diff --git a/frontend/app/layout.tsx b/frontend/app/layout.tsx new file mode 100644 index 0000000..f3181b2 --- /dev/null +++ b/frontend/app/layout.tsx @@ -0,0 +1,15 @@ +export const metadata = { + title: "Crawl BI Dashboard", + description: "Sales BI + Trend Engine + AI insight" +}; + +export default function RootLayout({ children }: { children: React.ReactNode }) { + return ( + + + {children} + + + ); +} + diff --git a/frontend/app/page.tsx b/frontend/app/page.tsx new file mode 100644 index 0000000..671b6e9 --- /dev/null +++ b/frontend/app/page.tsx @@ -0,0 +1,6 @@ +import Dashboard from "../components/Dashboard"; + +export default function Page() { + return ; +} + diff --git a/frontend/components/Dashboard.tsx b/frontend/components/Dashboard.tsx new file mode 100644 index 0000000..13dac5a --- /dev/null +++ b/frontend/components/Dashboard.tsx @@ -0,0 +1,228 @@ +"use client"; + +import React from "react"; +import ReactECharts from "echarts-for-react"; + +type OverviewResponse = { + schema: any; + metrics: Record; +}; + +type Winner = { + product_id: string; + title?: string | null; + category?: string | null; + units?: number; + gmv?: number; + potential_score?: number; + burst_score?: number; + follow_score?: number; + lifecycle?: string; +}; + +async function getJSON(path: string): Promise { + const r = await fetch(path, { cache: "no-store" }); + if (!r.ok) throw new Error(await r.text()); + return (await r.json()) as T; +} + +async function postJSON(path: string, body: any): Promise { + const r = await fetch(path, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify(body) + }); + if (!r.ok) throw new Error(await r.text()); + return (await r.json()) as T; +} + +export default function Dashboard() { + const [overview, setOverview] = React.useState(null); + const [winners, setWinners] = React.useState([]); + const [productId, setProductId] = React.useState(""); + const [series, setSeries] = React.useState<{ ds: string; units: number; gmv: number }[]>([]); + const [forecast, setForecast] = React.useState<{ ds: string; units_hat: number }[]>([]); + const [aiQuery, setAiQuery] = React.useState("给我当前最值得跟卖的3个品类/商品,并说明理由与风险。"); + const [aiAnswer, setAiAnswer] = React.useState(""); + const [err, setErr] = React.useState(""); + + React.useEffect(() => { + (async () => { + try { + const o = await getJSON("/api/metrics/overview"); + setOverview(o); + const w = await getJSON<{ items: Winner[] }>("/api/trend/potential-winners?days=14&limit=30"); + setWinners(w.items); + } catch (e: any) { + setErr(String(e?.message || e)); + } + })(); + }, []); + + async function loadProduct(p: string) { + setErr(""); + if (!p.trim()) { + setErr("请输入 product_id(不能为空)"); + return; + } + try { + const ts = await getJSON<{ product_id: string; points: any[] }>( + `/api/metrics/sales/timeseries?product_id=${encodeURIComponent(p)}&days=60` + ); + setSeries(ts.points as any); + const fc = await getJSON<{ forecast: any[] }>( + `/api/trend/forecast?product_id=${encodeURIComponent(p)}&days=60&horizon=14` + ); + setForecast(fc.forecast as any); + } catch (e: any) { + setErr(String(e?.message || e)); + } + } + + async function runAI() { + setErr(""); + try { + const r = await postJSON<{ answer: string }>("/api/ai/insight", { + query: aiQuery, + product_id: productId || undefined, + top_k: 6 + }); + setAiAnswer(r.answer || ""); + } catch (e: any) { + setErr(String(e?.message || e)); + } + } + + const chartOption = { + tooltip: { trigger: "axis" }, + legend: { data: ["units", "gmv", "forecast_units"] }, + xAxis: { type: "category", data: series.map((p) => p.ds.slice(0, 10)) }, + yAxis: [{ type: "value" }, { type: "value" }], + series: [ + { name: "units", type: "line", smooth: true, data: series.map((p) => p.units) }, + { name: "gmv", type: "line", smooth: true, yAxisIndex: 1, data: series.map((p) => p.gmv) }, + { + name: "forecast_units", + type: "line", + smooth: true, + lineStyle: { type: "dashed" }, + data: new Array(Math.max(0, series.length - 1)).fill(null).concat(forecast.map((f) => f.units_hat)) + } + ] + }; + + return ( +
+
+

爆款闭环 BI 看板

+
+
+ + {err ? ( +
+          {err}
+        
+ ) : null} + +
+ {overview?.metrics?.products ?? "-"} + {overview?.metrics?.units_30d ?? "-"} + {overview?.metrics?.gmv_30d ?? "-"} + {overview?.metrics?.rows_30d ?? "-"} +
+ +
+
+
+ 实时曲线 + 趋势预测 + setProductId(e.target.value)} + placeholder="输入 product_id" + style={{ flex: 1, padding: "8px 10px", borderRadius: 8, border: "1px solid #ddd" }} + /> + +
+
+ +
+
+ +
+ 发现爆款(Potential Winners) +
+ {winners.map((w) => ( +
{ + setProductId(w.product_id); + loadProduct(w.product_id); + }} + > +
+
{w.title || w.product_id}
+
{w.lifecycle}
+
+
+ units={fmt(w.units)} gmv={fmt(w.gmv)} · potential={fmt(w.potential_score)} · burst={fmt(w.burst_score)} · + follow={fmt(w.follow_score)} +
+
+ ))} +
+
+
+ +
+
+ AI 分析与决策辅助 + +
+