from __future__ import annotations import math import numpy as np import pandas as pd def _sigmoid(x: float) -> float: return 1.0 / (1.0 + math.exp(-x)) def compute_trend_scores(df: pd.DataFrame) -> pd.DataFrame: """ 在“表结构不确定”的情况下,先基于可用字段做可解释评分: - 潜伏期识别(Potential Winners):新上架/刚出现 + 指标增长快 - 爆发力:若存在 tiktok/search 等字段则融合,否则使用销量/GMV加速度代理 - 决策建议:跟卖指数 + 生命周期预警(缺失字段会自动降权) """ out = df.copy() now = pd.Timestamp.now(tz="UTC") # 新品/潜伏期:first_seen 越近、同时 units/gmv 相对高 -> potential first_seen = pd.to_datetime(out.get("first_seen"), errors="coerce", utc=True) age_days = (now - first_seen).dt.total_seconds() / 86400.0 age_days = age_days.fillna(999.0).clip(lower=0.0) units = pd.to_numeric(out.get("units"), errors="coerce").fillna(0.0) gmv = pd.to_numeric(out.get("gmv"), errors="coerce").fillna(0.0) # 规模归一:log1p 降噪 units_s = np.log1p(units) gmv_s = np.log1p(gmv) freshness = 1.0 / (1.0 + (age_days / 7.0)) # 0~1 scale = (units_s.rank(pct=True) * 0.6 + gmv_s.rank(pct=True) * 0.4).clip(0.0, 1.0) out["potential_score"] = (freshness * 0.55 + scale * 0.45).clip(0.0, 1.0) # 爆发力:优先融合可选字段 tiktok_raw = out["tiktok_hot"] if "tiktok_hot" in out.columns else pd.Series(np.nan, index=out.index) search_raw = out["search_growth"] if "search_growth" in out.columns else pd.Series(np.nan, index=out.index) tiktok = pd.to_numeric(tiktok_raw, errors="coerce") search_g = pd.to_numeric(search_raw, errors="coerce") if tiktok.notna().any() or search_g.notna().any(): tiktok_s = tiktok.fillna(tiktok.median() if tiktok.notna().any() else 0.0) search_s = search_g.fillna(search_g.median() if search_g.notna().any() else 0.0) burst = ( pd.Series(tiktok_s).rank(pct=True) * 0.6 + pd.Series(search_s).rank(pct=True) * 0.4 ).clip(0.0, 1.0) else: # 无外部热度字段:用规模 + 新鲜度 作为代理 burst = (scale * 0.65 + freshness * 0.35).clip(0.0, 1.0) out["burst_score"] = burst # 跟卖指数:竞争(records 越多)负向;利润空间/供应链难度若缺失则降级 records = pd.to_numeric(out.get("records"), errors="coerce").fillna(0.0) competition = records.rank(pct=True).clip(0.0, 1.0) # 越大越卷 margin_raw = out["margin"] if "margin" in out.columns else pd.Series(np.nan, index=out.index) margin = pd.to_numeric(margin_raw, errors="coerce") if margin.notna().any(): margin_s = margin.fillna(margin.median()).rank(pct=True).clip(0.0, 1.0) margin_w = 0.35 else: margin_s = pd.Series(0.5, index=out.index) margin_w = 0.15 supply_raw = out["supply_difficulty"] if "supply_difficulty" in out.columns else pd.Series(np.nan, index=out.index) supply = pd.to_numeric(supply_raw, errors="coerce") if supply.notna().any(): supply_s = (1.0 - supply.fillna(supply.median()).rank(pct=True)).clip(0.0, 1.0) # 越难越低分 supply_w = 0.20 else: supply_s = pd.Series(0.5, index=out.index) supply_w = 0.10 # 趋势作为正向 trend = (out["potential_score"] * 0.5 + out["burst_score"] * 0.5).clip(0.0, 1.0) trend_w = 0.45 comp_w = 0.20 follow = ( trend * trend_w + margin_s * margin_w + supply_s * supply_w + (1.0 - competition) * comp_w ) out["follow_score"] = follow.clip(0.0, 1.0) # 生命周期预警(简化):过老 + 规模不增长 + 竞争高 => red-ocean / decline lifecycle = [] for i in out.index: a = float(age_days.loc[i]) comp = float(competition.loc[i]) tr = float(trend.loc[i]) if a > 120 and comp > 0.7 and tr < 0.4: lifecycle.append("decline_or_red_ocean") elif a > 60 and comp > 0.75: lifecycle.append("red_ocean") elif a < 21 and tr > 0.65: lifecycle.append("early_growth") else: lifecycle.append("normal") out["lifecycle"] = lifecycle return out