fix:优化参数指数

This commit is contained in:
Daniel
2026-03-06 14:20:18 +08:00
parent 3251de6406
commit 13a8d8af91
6 changed files with 203 additions and 14 deletions

View File

@@ -0,0 +1,83 @@
# -*- coding: utf-8 -*-
"""
华尔街财团投入指数 & 反击情绪指数:从爬虫实时数据计算稳定指标,抑制单条报道导致的剧烈波动。
供 db_merge.merge() 调用,写入同一批 DB 表,前端契约不变。
"""
from typing import Optional, Tuple
VALUE_CLAMP_MIN = 1
VALUE_CLAMP_MAX = 99
# 华尔街:与上一点平滑,新点权重
WALL_STREET_NEW_WEIGHT = 0.35 # raw 权重1 - 此值 = 上一点权重,越大曲线越平滑
# 华尔街:两次写入最小间隔(分钟),避免短时间多条报道造成密集锯齿
WALL_STREET_MIN_INTERVAL_MINUTES = 20
# 反击情绪当前值权重1 - 此值 = 新 raw 权重)
RETALIATION_CURRENT_WEIGHT = 0.8
# 反击情绪:单次更新相对当前值的最大变化幅度(绝对值)
RETALIATION_MAX_STEP = 5
def clamp(value: int) -> int:
return max(VALUE_CLAMP_MIN, min(VALUE_CLAMP_MAX, int(value)))
def smooth_wall_street(
raw_value: int,
last_value: Optional[int],
*,
new_weight: float = WALL_STREET_NEW_WEIGHT,
) -> int:
"""
华尔街投入指数:用上一点做平滑,避免单条报道 30/80 导致曲线骤变。
若尚无上一点,直接使用限幅后的 raw。
"""
raw = clamp(raw_value)
if last_value is None:
return raw
w = 1.0 - new_weight
return clamp(round(w * last_value + new_weight * raw))
def wall_street_should_append(
last_time_iso: Optional[str],
new_time_iso: str,
min_interval_minutes: int = WALL_STREET_MIN_INTERVAL_MINUTES,
) -> bool:
"""
是否应追加一条华尔街趋势点。若与上一条间隔不足 min_interval_minutes 则跳过,
减少因爬虫短时间多篇报道导致的密集锯齿。
"""
if not last_time_iso:
return True
try:
from datetime import datetime
last = datetime.fromisoformat(last_time_iso.replace("Z", "+00:00"))
new = datetime.fromisoformat(new_time_iso.replace("Z", "+00:00"))
delta_min = (new - last).total_seconds() / 60
return delta_min >= min_interval_minutes
except Exception:
return True
def smooth_retaliation(
raw_value: int,
current_value: int,
*,
current_weight: float = RETALIATION_CURRENT_WEIGHT,
max_step: int = RETALIATION_MAX_STEP,
) -> int:
"""
反击情绪指数:先与当前值平滑,再限制单步变化幅度,避免连续多条报道导致快速漂移或抖动。
"""
raw = clamp(raw_value)
cur = clamp(current_value)
smoothed = round(current_weight * cur + (1.0 - current_weight) * raw)
smoothed = clamp(smoothed)
# 单步变化上限
delta = smoothed - cur
if abs(delta) > max_step:
step = max_step if delta > 0 else -max_step
smoothed = clamp(cur + step)
return smoothed