from __future__ import annotations import json import logging from typing import Any import pandas as pd from sqlalchemy import Engine, text from ..db import get_engine from ..services.schema_discovery import discover_schema from ..settings import settings log = logging.getLogger("db_sample") def _truncate_value(v: Any, max_len: int) -> Any: if v is None: return None if isinstance(v, (int, float, bool)): return v s = str(v) if len(s) <= max_len: return s return s[: max_len - 3] + "..." def _df_to_records(df: pd.DataFrame, max_str_len: int) -> list[dict[str, Any]]: out: list[dict[str, Any]] = [] for _, row in df.iterrows(): rec: dict[str, Any] = {} for k, v in row.items(): rec[str(k)] = _truncate_value(v, max_str_len) out.append(rec) return out def _list_tables(engine: Engine) -> list[str]: with engine.connect() as conn: rows = conn.execute( text( """ SELECT table_name FROM information_schema.tables WHERE table_schema = DATABASE() ORDER BY table_name """ ) ).all() return [r[0] for r in rows] def _table_row_count(engine: Engine, table: str) -> int | None: try: with engine.connect() as conn: v = conn.execute(text(f"SELECT COUNT(*) FROM {table}")).scalar() return int(v) if v is not None else None except Exception: return None def _sample_table(engine: Engine, table: str, limit: int) -> pd.DataFrame: with engine.connect() as conn: return pd.read_sql(text(f"SELECT * FROM {table} LIMIT {limit}"), conn) def print_db_sample_to_logs(limit: int | None = None) -> None: """ 打印数据库结构与样例行到后端日志,便于分析。 注意:会截断长字符串,避免日志爆炸。 """ engine = get_engine() schema = discover_schema(engine) eff_limit = int(limit or settings.debug_db_sample_limit) max_str_len = int(settings.debug_db_sample_max_str_len) tables = _list_tables(engine) log.warning("DB SAMPLE: discovered schema=%s", schema.model_dump()) log.warning("DB SAMPLE: tables=%s", tables) # prioritize discovered tables first prioritized: list[str] = [] for t in [schema.sales_table, schema.products_table]: if t and t in tables and t not in prioritized: prioritized.append(t) for t in tables: if t not in prioritized: prioritized.append(t) for t in prioritized[: min(len(prioritized), 8)]: cnt = _table_row_count(engine, t) try: df = _sample_table(engine, t, eff_limit) recs = _df_to_records(df, max_str_len=max_str_len) log.warning( "DB SAMPLE: table=%s rows=%s cols=%s sample=%s", t, cnt, list(df.columns), json.dumps(recs, ensure_ascii=False), ) except Exception as e: log.warning("DB SAMPLE: table=%s rows=%s sample_failed=%s", t, cnt, str(e))