Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 158 additions & 0 deletions stratevo/evolution/auto_evolve.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,141 @@
logger.info("Evolution Engine v4 -- Walk-Forward + Smart Evolution + 57-dim Factors")


def _compute_ic_correlation(
active_factor_names: List[str],
indicators: Dict[str, Dict[str, Any]],
codes: List[str],
seed: int,
sample_stocks: int = 20,
sample_dates: int = 50,
) -> float:
"""Compute average pairwise absolute Pearson correlation of active factors.

Uses a lightweight sampling strategy (few stocks, few dates) to approximate
the cross-sectional IC correlation without being expensive. The factor
scores come from the pre-computed ``indicators`` dict rather than
re-running indicators, keeping cost < 100ms.

Args:
active_factor_names: Weight keys with w >= 0.001 (e.g. ``w_momentum``).
indicators: Pre-computed indicator dict ``{code: {indicator_name: value}}``.
codes: Stock codes in the current evaluation sample.
seed: RNG seed for deterministic sampling.
sample_stocks: Max stocks to sample (default 20).
sample_dates: Max date indices to sample (default 50).

Returns:
Average |correlation| across all active factor pairs (0.0–1.0).
Returns 0.0 if fewer than 2 factors or insufficient data.
"""
if len(active_factor_names) < 2 or not codes:
return 0.0

rng = random.Random(seed)

# Sample stocks
sampled_codes = codes if len(codes) <= sample_stocks else rng.sample(codes, sample_stocks)

# Map weight key -> indicator key used in the indicators dict.
# Weight keys are "w_momentum", indicator keys are "momentum", etc.
factor_to_indicator = {}
for wk in active_factor_names:
ind_key = wk[2:] if wk.startswith("w_") else wk # strip "w_" prefix
factor_to_indicator[wk] = ind_key

# Build score vectors per factor.
# Each indicators[code] has arrays like {"momentum": [...], "rsi": [...], ...}
# or scalar values. We need date-indexed values.
# Find the minimum data length across sampled stocks.
min_len = None
for code in sampled_codes:
ind = indicators.get(code)
if ind is None:
continue
for wk in active_factor_names:
ind_key = factor_to_indicator[wk]
val = ind.get(ind_key)
if val is not None and isinstance(val, (list, tuple)):
data_len = len(val)
if min_len is None or data_len < min_len:
min_len = data_len
break # one indicator is enough per stock to find length

if min_len is None or min_len < 30:
return 0.0

# Sample date indices (skip first 30 for warmup)
warmup = 30
available = list(range(warmup, min_len))
if len(available) > sample_dates:
date_indices = sorted(rng.sample(available, sample_dates))
else:
date_indices = available

# Collect score vectors: factor -> list[float]
vectors: Dict[str, List[float]] = {wk: [] for wk in active_factor_names}
for code in sampled_codes:
ind = indicators.get(code)
if ind is None:
continue
for day_idx in date_indices:
for wk in active_factor_names:
ind_key = factor_to_indicator[wk]
val = ind.get(ind_key)
if val is None:
vectors[wk].append(0.5)
elif isinstance(val, (list, tuple)):
if day_idx < len(val):
v = val[day_idx]
if not isinstance(v, (int, float)) or not math.isfinite(v):
v = 0.5
vectors[wk].append(float(v))
else:
vectors[wk].append(0.5)
elif isinstance(val, (int, float)):
vectors[wk].append(float(val) if math.isfinite(val) else 0.5)
else:
vectors[wk].append(0.5)

# Compute average pairwise |correlation|
factor_list = list(active_factor_names)
n_factors = len(factor_list)
total_corr = 0.0
n_pairs = 0
for i in range(n_factors):
vi = vectors[factor_list[i]]
for j in range(i + 1, n_factors):
vj = vectors[factor_list[j]]
corr = _pearson_corr_fast(vi, vj)
total_corr += abs(corr)
n_pairs += 1

if n_pairs == 0:
return 0.0
return total_corr / n_pairs


def _pearson_corr_fast(x: List[float], y: List[float]) -> float:
"""Pearson correlation between two equal-length lists. Returns 0.0 on edge cases."""
n = len(x)
if n == 0 or n != len(y):
return 0.0
mean_x = sum(x) / n
mean_y = sum(y) / n
cov = 0.0
var_x = 0.0
var_y = 0.0
for i in range(n):
dx = x[i] - mean_x
dy = y[i] - mean_y
cov += dx * dy
var_x += dx * dx
var_y += dy * dy
if var_x == 0.0 or var_y == 0.0:
return 0.0
return cov / (math.sqrt(var_x) * math.sqrt(var_y))


def filter_stock_pool(
data: Dict[str, Dict[str, list]],
min_daily_amount: float = 20_000_000.0,
Expand Down Expand Up @@ -1558,6 +1693,27 @@ def _run_backtest(day_start: int, day_end: int) -> Tuple[
_fw = dna.to_dict()
_factor_weights = {k: v for k, v in _fw.items() if k.startswith('w_') or k in (dna.custom_weights or {})}

# ── IC de-duplication: compute average pairwise correlation of active factors ──
# Lightweight approximation: sample 20 stocks × 50 dates from pre-computed indicators.
# Cached per generation via gen_seed on self._ic_corr_cache.
ic_avg_corr: Optional[float] = None
if not hasattr(self, '_ic_corr_cache'):
self._ic_corr_cache: Dict[int, Dict[str, float]] = {}

active_factor_names = [k for k, v in _factor_weights.items() if v >= 0.001]
cache_key = gen_seed
dna_key = ",".join(sorted(active_factor_names))

if cache_key in self._ic_corr_cache and dna_key in self._ic_corr_cache[cache_key]:
ic_avg_corr = self._ic_corr_cache[cache_key][dna_key]
elif len(active_factor_names) >= 2:
ic_avg_corr = _compute_ic_correlation(
active_factor_names, indicators, codes, gen_seed,
)
if cache_key not in self._ic_corr_cache:
self._ic_corr_cache[cache_key] = {}
self._ic_corr_cache[cache_key][dna_key] = ic_avg_corr

train_fitness = compute_fitness(
train_ret, train_dd, train_wr, train_sharpe, train_trades,
sortino=train_sortino,
Expand All @@ -1567,6 +1723,7 @@ def _run_backtest(day_start: int, day_end: int) -> Tuple[
max_positions=dna.max_positions,
avg_turnover=train_avg_turnover,
factor_weights=_factor_weights,
ic_correlation_penalty=ic_avg_corr,
)
val_fitness = compute_fitness(
val_ret, val_dd, val_wr, val_sharpe, val_trades,
Expand All @@ -1577,6 +1734,7 @@ def _run_backtest(day_start: int, day_end: int) -> Tuple[
max_positions=dna.max_positions,
avg_turnover=val_avg_turnover,
factor_weights=_factor_weights,
ic_correlation_penalty=ic_avg_corr,
)

fitness = 0.4 * train_fitness + 0.6 * val_fitness
Expand Down
30 changes: 26 additions & 4 deletions stratevo/evolution/scoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -586,18 +586,26 @@ def compute_fitness(
max_positions: int = 1,
avg_turnover: float = 0.0,
factor_weights: Optional[Dict[str, float]] = None,
ic_correlation_penalty: Optional[float] = None,
) -> float:
"""Compute composite fitness score.

fitness = annual_return * sqrt(win_rate) / max(max_drawdown, 5.0) * sharpe_bonus * trade_penalty
* sortino_bonus * consec_loss_penalty * consistency_bonus * diversification_bonus
* turnover_penalty * factor_diversity_bonus
* turnover_penalty * factor_diversity_bonus * ic_dedup_factor

Rewards: high return, high win rate, low drawdown, good Sharpe, enough trades,
Sortino > Sharpe, consistent monthly returns, diversified holdings,
diversified factor usage.
diversified factor usage, low inter-factor IC correlation.
Penalizes: fewer than 30 trades, long consecutive loss streaks, very high turnover,
over-reliance on few factors.
over-reliance on few factors, high IC correlation among active factors.

Args:
ic_correlation_penalty: Average pairwise IC correlation of active factors
(0.0–1.0). If provided, applies a multiplier: low correlation (<0.3)
earns a bonus up to 1.15x; high correlation (>0.7) incurs a penalty
down to 0.7x. Complements the HHI-based factor_diversity_bonus which
only checks weight concentration, not factor similarity.
"""
# Guard against NaN/Inf inputs (can occur from degenerate backtests)
if not math.isfinite(annual_return):
Expand Down Expand Up @@ -713,9 +721,23 @@ def compute_fitness(
else:
factor_diversity_bonus = 0.85 + 0.3 * diversity_ratio

# === IC de-duplication factor (anti-redundancy) ===
# Penalize strategies whose active factors are highly correlated
# (measuring the same signal). Complements HHI which only checks
# weight concentration — this checks actual factor similarity.
ic_dedup_factor = 1.0
if ic_correlation_penalty is not None and math.isfinite(ic_correlation_penalty):
avg_corr = max(0.0, min(1.0, ic_correlation_penalty))
if avg_corr > 0.7:
# High redundancy: linear penalty from 1.0 at 0.7 down to 0.7 at 1.0
ic_dedup_factor = 1.0 - (avg_corr - 0.7) * (0.3 / 0.3) # 0.7 at corr=1.0
elif avg_corr < 0.3:
# Low redundancy: linear bonus from 1.0 at 0.3 up to 1.15 at 0.0
ic_dedup_factor = 1.0 + (0.3 - avg_corr) * (0.15 / 0.3) # 1.15 at corr=0.0

result = (base_fitness * sortino_bonus * consec_loss_penalty
* consistency_bonus * diversification_bonus * turnover_penalty
* factor_diversity_bonus)
* factor_diversity_bonus * ic_dedup_factor)

# Overflow protection: cap to prevent inf from extreme inputs
if not math.isfinite(result):
Expand Down
Loading
Loading