Skip to content

Optimizer

midas.optimizer

Bayesian optimizer for strategy parameters using Optuna TPE.

optimize(portfolio, price_data, start, end, strategy_names=None, n_trials=DEFAULT_N_TRIALS, min_cash_pct=DEFAULT_MIN_CASH_PCT, log_fn=None)

Bayesian optimization over strategy parameters using Optuna TPE.

Runs n_trials Optuna trials (default 200). Each trial samples a parameter combination via the Tree-structured Parzen Estimator and evaluates it with a full backtest. Backtests are executed in a worker pool to utilise multiple CPU cores.

Source code in src/midas/optimizer.py
def optimize(
    portfolio: PortfolioConfig,
    price_data: dict[str, pd.Series],
    start: date,
    end: date,
    strategy_names: list[str] | None = None,
    n_trials: int = DEFAULT_N_TRIALS,
    min_cash_pct: float = DEFAULT_MIN_CASH_PCT,
    log_fn: Callable[[str], None] | None = None,
) -> OptimizeResult:
    """Bayesian optimization over strategy parameters using Optuna TPE.

    Runs *n_trials* Optuna trials (default 200).  Each trial samples a
    parameter combination via the Tree-structured Parzen Estimator and
    evaluates it with a full backtest.  Backtests are executed in a worker
    pool to utilise multiple CPU cores.
    """
    log = log_fn or (lambda _: None)

    names = strategy_names or [
        k for k in PARAM_RANGES if k != _GLOBAL_KEY
    ]
    # Filter to strategies that have defined ranges and are not MECHANICAL
    names = [
        n for n in names
        if n in PARAM_RANGES
        and STRATEGY_REGISTRY[n]().tier != StrategyTier.MECHANICAL
    ]

    if not names:
        msg = "No optimizable strategies found"
        raise ValueError(msg)

    # Always include global allocation knobs
    names.append(_GLOBAL_KEY)

    # Compute max_position_pct range from portfolio size.
    n_tickers = sum(1 for h in portfolio.holdings if h.shares > 0)
    equal_weight = (1.0 - min_cash_pct) / max(n_tickers, 1)
    # Bounds match the allocator's warning thresholds: 1.5x-5x equal weight,
    # clamped to [0.10, 0.80].
    lo = max(round(1.5 * equal_weight, 2), 0.10)
    hi = min(round(5.0 * equal_weight, 2), 0.80)
    if lo >= hi:
        lo, hi = 0.10, 0.80
    step = round((hi - lo) / 8, 2) or 0.01
    ranges = {k: dict(PARAM_RANGES[k]) for k in names if k in PARAM_RANGES}
    ranges.setdefault(_GLOBAL_KEY, {})
    ranges[_GLOBAL_KEY]["max_position_pct"] = (lo, hi, step)

    max_workers = min((os.cpu_count() or 4) // 2, n_trials) or 1

    log(f"Optimizing {', '.join(names)}{n_trials} trials ({max_workers} workers)")
    log(
        f"  max_position_pct range: {lo:.2f}-{hi:.2f}"
        f" (equal weight: {equal_weight:.2f})"
    )

    # Suppress Optuna's default logging (we provide our own via log_fn).
    optuna.logging.set_verbosity(optuna.logging.WARNING)

    study = optuna.create_study(
        direction="maximize",
        sampler=optuna.samplers.TPESampler(seed=42),
    )

    # -- Objective that runs in the main process but farms backtest to pool --
    pool = ProcessPoolExecutor(
        max_workers=max_workers,
        initializer=_init_worker,
        initargs=(portfolio, price_data, start, end, min_cash_pct),
    )

    trials_done = 0

    def objective(trial: optuna.Trial) -> float:
        nonlocal trials_done

        strategy_params: dict[str, dict[str, float]] = {}
        for name in names:
            strategy_params[name] = _suggest_params(
                trial, name, ranges[name],
            )

        total_ret, bh_ret, train_ret, test_ret = pool.submit(
            _trial_worker, strategy_params,
        ).result()

        # Store auxiliary metrics as user attributes for later retrieval.
        trial.set_user_attr("bh_return", bh_ret)
        trial.set_user_attr("train_return", train_ret)
        trial.set_user_attr("test_return", test_ret)
        trial.set_user_attr("params", strategy_params)

        trials_done += 1
        if trials_done % 25 == 0 or trials_done == n_trials:
            log(f"  {trials_done}/{n_trials} — best so far: {study.best_value:.2%}")

        return total_ret

    try:
        study.optimize(objective, n_trials=n_trials)
    finally:
        pool.shutdown(wait=False)

    best = study.best_trial
    best_params: dict[str, dict[str, float]] = best.user_attrs["params"]

    log(f"Best return: {best.value:.2%} in {len(study.trials)} trials")

    return OptimizeResult(
        best_params=best_params,
        best_return=round(best.value, 4),
        best_bh_return=round(best.user_attrs["bh_return"], 4),
        best_train_return=round(best.user_attrs["train_return"], 4),
        best_test_return=round(best.user_attrs["test_return"], 4),
        trials_run=len(study.trials),
    )

write_strategies_yaml(params, path, min_cash_pct=DEFAULT_MIN_CASH_PCT)

Write optimized parameters to a strategies YAML file.

Source code in src/midas/optimizer.py
def write_strategies_yaml(
    params: dict[str, dict[str, float]],
    path: str,
    min_cash_pct: float = DEFAULT_MIN_CASH_PCT,
) -> None:
    """Write optimized parameters to a strategies YAML file."""
    output: dict[str, object] = {}

    # Emit global allocation knobs as top-level keys
    if _GLOBAL_KEY in params:
        for k, v in params[_GLOBAL_KEY].items():
            output[k] = round(v, 4)

    # min_cash_pct is not optimized — preserve the user's configured value
    output["min_cash_pct"] = round(min_cash_pct, 4)

    strategies = []
    for name, p in params.items():
        if name == _GLOBAL_KEY:
            continue
        entry: dict[str, object] = {"name": name}
        clean_params: dict[str, object] = {}
        for k, v in p.items():
            if k == "_weight":
                entry["weight"] = round(v, 4)
            elif k == "_veto_threshold":
                entry["veto_threshold"] = round(v, 4)
            elif k in _INT_PARAMS:
                clean_params[k] = int(v)
            else:
                clean_params[k] = round(v, 4)
        if clean_params:
            entry["params"] = clean_params
        strategies.append(entry)

    output["strategies"] = strategies

    with open(path, "w") as f:
        yaml.dump(output, f, default_flow_style=False)