from __future__ import annotations from dataclasses import dataclass from pydantic import BaseModel from sqlalchemy import Engine, text class DiscoveredSchema(BaseModel): # discovered table names sales_table: str | None = None products_table: str | None = None # required columns (in sales_table) sales_product_id_col: str | None = None sales_time_col: str | None = None sales_units_col: str | None = None sales_amount_col: str | None = None # optional product cols product_title_col: str | None = None product_created_col: str | None = None product_rank_col: str | None = None product_category_col: str | None = None product_desc_col: str | None = None @property def overview_sql(self) -> str: # minimal, safe aggregations t = self.sales_table pid = self.sales_product_id_col ts = self.sales_time_col units = self.sales_units_col amount = self.sales_amount_col return f""" SELECT COUNT(DISTINCT {pid}) AS products, SUM(COALESCE({units}, 0)) AS units_30d, SUM(COALESCE({amount}, 0)) AS gmv_30d, COUNT(*) AS rows_30d FROM {t} WHERE {ts} >= (UTC_TIMESTAMP() - INTERVAL 30 DAY) """ @property def timeseries_sql(self) -> str: t = self.sales_table pid = self.sales_product_id_col ts = self.sales_time_col units = self.sales_units_col amount = self.sales_amount_col return f""" SELECT DATE({ts}) AS ds, SUM(COALESCE({units}, 0)) AS units, SUM(COALESCE({amount}, 0)) AS gmv FROM {t} WHERE {pid} = :product_id AND {ts} >= :since GROUP BY DATE({ts}) ORDER BY ds ASC """ @property def trend_candidates_sql(self) -> str: # produce per-product last-N-day rollups; join products when available t = self.sales_table pid = self.sales_product_id_col ts = self.sales_time_col units = self.sales_units_col amount = self.sales_amount_col p = self.products_table title = self.product_title_col created = self.product_created_col rank = self.product_rank_col cat = self.product_category_col join = "" if p: join = f"LEFT JOIN {p} p ON p.{pid} = s.{pid}" if self._products_has_same_pid_name else f"" # if we can't confidently join, still return sales-only metrics select_p = "" if p and join: title_expr = f"p.{title}" if title else "NULL" cat_expr = f"p.{cat}" if cat else "NULL" created_expr = f"p.{created}" if created else "NULL" rank_expr = f"p.{rank}" if rank else "NULL" select_p = f""", {title_expr} AS title, {cat_expr} AS category, {created_expr} AS created_at, {rank_expr} AS rank_now """ return f""" SELECT s.{pid} AS product_id, SUM(COALESCE(s.{units}, 0)) AS units, SUM(COALESCE(s.{amount}, 0)) AS gmv, COUNT(*) AS records, MIN(s.{ts}) AS first_seen, MAX(s.{ts}) AS last_seen {select_p} FROM {t} s {join} WHERE s.{ts} >= :since GROUP BY s.{pid} ORDER BY units DESC LIMIT :limit """ @property def _products_has_same_pid_name(self) -> bool: # discovery sets this attribute dynamically return getattr(self, "__products_has_same_pid_name", False) def set_products_pid_same(self, v: bool) -> None: setattr(self, "__products_has_same_pid_name", v) SALES_UNITS_CANDIDATES = ["units", "qty", "quantity", "sales", "sold", "order_qty", "num"] SALES_AMOUNT_CANDIDATES = ["amount", "gmv", "revenue", "pay_amount", "total", "price", "order_amount"] TIME_CANDIDATES = ["created_at", "create_time", "created", "ts", "timestamp", "date_time", "paid_at", "order_time"] PID_CANDIDATES = ["product_id", "item_id", "sku_id", "goods_id", "asin"] PRODUCT_TITLE_CANDIDATES = ["title", "name", "product_name", "item_title"] PRODUCT_DESC_CANDIDATES = ["description", "desc", "detail"] PRODUCT_CREATED_CANDIDATES = ["created_at", "create_time", "created"] PRODUCT_RANK_CANDIDATES = ["rank", "bsr_rank", "position"] PRODUCT_CATEGORY_CANDIDATES = ["category", "cat", "category_name"] def _lower(s: str | None) -> str: return (s or "").lower() def _pick(cols: list[str], candidates: list[str]) -> str | None: cols_l = {_lower(c): c for c in cols} for cand in candidates: if cand in cols_l: return cols_l[cand] return None def discover_schema(engine: Engine) -> DiscoveredSchema: """ 在未知表结构的情况下做“足够稳妥”的自动发现: - 优先寻找包含 product_id + 时间 + 数量/金额 的表作为 sales_table - 寻找包含 title/name 等列的表作为 products_table """ with engine.connect() as conn: rows = conn.execute( text( """ SELECT table_name, column_name FROM information_schema.columns WHERE table_schema = DATABASE() ORDER BY table_name, ordinal_position """ ) ).all() by_table: dict[str, list[str]] = {} for t, c in rows: by_table.setdefault(t, []).append(c) best_sales: tuple[int, str, dict[str, str]] | None = None best_products: tuple[int, str, dict[str, str]] | None = None for t, cols in by_table.items(): pid = _pick(cols, PID_CANDIDATES) ts = _pick(cols, TIME_CANDIDATES) units = _pick(cols, SALES_UNITS_CANDIDATES) amount = _pick(cols, SALES_AMOUNT_CANDIDATES) score = 0 if pid: score += 3 if ts: score += 3 if units: score += 2 if amount: score += 1 if score >= 6: if best_sales is None or score > best_sales[0]: best_sales = (score, t, {"pid": pid, "ts": ts, "units": units, "amount": amount}) title = _pick(cols, PRODUCT_TITLE_CANDIDATES) if title: pscore = 2 if _pick(cols, PID_CANDIDATES): pscore += 2 if _pick(cols, PRODUCT_CATEGORY_CANDIDATES): pscore += 1 if _pick(cols, PRODUCT_DESC_CANDIDATES): pscore += 1 if best_products is None or pscore > best_products[0]: best_products = (pscore, t, {"title": title}) schema = DiscoveredSchema() if best_sales: _, t, m = best_sales schema.sales_table = t schema.sales_product_id_col = m["pid"] schema.sales_time_col = m["ts"] schema.sales_units_col = m["units"] or m["amount"] # last resort schema.sales_amount_col = m["amount"] or m["units"] if best_products: _, pt, _ = best_products schema.products_table = pt cols = by_table.get(pt, []) schema.product_title_col = _pick(cols, PRODUCT_TITLE_CANDIDATES) schema.product_desc_col = _pick(cols, PRODUCT_DESC_CANDIDATES) schema.product_created_col = _pick(cols, PRODUCT_CREATED_CANDIDATES) schema.product_rank_col = _pick(cols, PRODUCT_RANK_CANDIDATES) schema.product_category_col = _pick(cols, PRODUCT_CATEGORY_CANDIDATES) schema.set_products_pid_same(_pick(cols, PID_CANDIDATES) == schema.sales_product_id_col) return schema