def optimize(
portfolio: PortfolioConfig,
price_data: dict[str, pd.Series],
start: date,
end: date,
strategy_names: list[str] | None = None,
n_trials: int = DEFAULT_N_TRIALS,
min_cash_pct: float = DEFAULT_MIN_CASH_PCT,
log_fn: Callable[[str], None] | None = None,
) -> OptimizeResult:
"""Bayesian optimization over strategy parameters using Optuna TPE.
Runs *n_trials* Optuna trials (default 200). Each trial samples a
parameter combination via the Tree-structured Parzen Estimator and
evaluates it with a full backtest. Backtests are executed in a worker
pool to utilise multiple CPU cores.
"""
log = log_fn or (lambda _: None)
names = strategy_names or [
k for k in PARAM_RANGES if k != _GLOBAL_KEY
]
# Filter to strategies that have defined ranges and are not MECHANICAL
names = [
n for n in names
if n in PARAM_RANGES
and STRATEGY_REGISTRY[n]().tier != StrategyTier.MECHANICAL
]
if not names:
msg = "No optimizable strategies found"
raise ValueError(msg)
# Always include global allocation knobs
names.append(_GLOBAL_KEY)
# Compute max_position_pct range from portfolio size.
n_tickers = sum(1 for h in portfolio.holdings if h.shares > 0)
equal_weight = (1.0 - min_cash_pct) / max(n_tickers, 1)
# Bounds match the allocator's warning thresholds: 1.5x-5x equal weight,
# clamped to [0.10, 0.80].
lo = max(round(1.5 * equal_weight, 2), 0.10)
hi = min(round(5.0 * equal_weight, 2), 0.80)
if lo >= hi:
lo, hi = 0.10, 0.80
step = round((hi - lo) / 8, 2) or 0.01
ranges = {k: dict(PARAM_RANGES[k]) for k in names if k in PARAM_RANGES}
ranges.setdefault(_GLOBAL_KEY, {})
ranges[_GLOBAL_KEY]["max_position_pct"] = (lo, hi, step)
max_workers = min((os.cpu_count() or 4) // 2, n_trials) or 1
log(f"Optimizing {', '.join(names)} — {n_trials} trials ({max_workers} workers)")
log(
f" max_position_pct range: {lo:.2f}-{hi:.2f}"
f" (equal weight: {equal_weight:.2f})"
)
# Suppress Optuna's default logging (we provide our own via log_fn).
optuna.logging.set_verbosity(optuna.logging.WARNING)
study = optuna.create_study(
direction="maximize",
sampler=optuna.samplers.TPESampler(seed=42),
)
# -- Objective that runs in the main process but farms backtest to pool --
pool = ProcessPoolExecutor(
max_workers=max_workers,
initializer=_init_worker,
initargs=(portfolio, price_data, start, end, min_cash_pct),
)
trials_done = 0
def objective(trial: optuna.Trial) -> float:
nonlocal trials_done
strategy_params: dict[str, dict[str, float]] = {}
for name in names:
strategy_params[name] = _suggest_params(
trial, name, ranges[name],
)
total_ret, bh_ret, train_ret, test_ret = pool.submit(
_trial_worker, strategy_params,
).result()
# Store auxiliary metrics as user attributes for later retrieval.
trial.set_user_attr("bh_return", bh_ret)
trial.set_user_attr("train_return", train_ret)
trial.set_user_attr("test_return", test_ret)
trial.set_user_attr("params", strategy_params)
trials_done += 1
if trials_done % 25 == 0 or trials_done == n_trials:
log(f" {trials_done}/{n_trials} — best so far: {study.best_value:.2%}")
return total_ret
try:
study.optimize(objective, n_trials=n_trials)
finally:
pool.shutdown(wait=False)
best = study.best_trial
best_params: dict[str, dict[str, float]] = best.user_attrs["params"]
log(f"Best return: {best.value:.2%} in {len(study.trials)} trials")
return OptimizeResult(
best_params=best_params,
best_return=round(best.value, 4),
best_bh_return=round(best.user_attrs["bh_return"], 4),
best_train_return=round(best.user_attrs["train_return"], 4),
best_test_return=round(best.user_attrs["test_return"], 4),
trials_run=len(study.trials),
)