Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions flagscale/inference/tuning/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from .ecotune import (
EcoTuneOptimizer,
EvaluationResult,
SearchDimension,
SearchSpace,
Suggestion,
TokenAwareExpectedImprovement,
)

__all__ = [
"EcoTuneOptimizer",
"EvaluationResult",
"SearchDimension",
"SearchSpace",
"Suggestion",
"TokenAwareExpectedImprovement",
]
14 changes: 14 additions & 0 deletions flagscale/inference/tuning/ecotune/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from .acquisition import TokenAwareExpectedImprovement
from .optimizer import EcoTuneOptimizer, EvaluationResult, Suggestion
from .search_space import SearchDimension, SearchSpace
from .surrogate import MultiFidelityGPSurrogate

__all__ = [
"EcoTuneOptimizer",
"EvaluationResult",
"MultiFidelityGPSurrogate",
"SearchDimension",
"SearchSpace",
"Suggestion",
"TokenAwareExpectedImprovement",
]
36 changes: 36 additions & 0 deletions flagscale/inference/tuning/ecotune/acquisition.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from __future__ import annotations

from typing import Callable

import numpy as np
from scipy.stats import norm

from .surrogate import MultiFidelityGPSurrogate


class TokenAwareExpectedImprovement:
def __init__(
self,
surrogate: MultiFidelityGPSurrogate,
token_cost_fn: Callable[[dict, float], float],
incumbent_fidelity: float,
xi: float = 0.01,
):
self.surrogate = surrogate
self.token_cost_fn = token_cost_fn
self.incumbent_fidelity = float(incumbent_fidelity)
self.xi = float(xi)

def _expected_improvement(self, mu: float, var: float, incumbent: float) -> float:
sigma = float(np.sqrt(max(var, 1e-12)))
improvement = mu - incumbent - self.xi
z = improvement / sigma
ei = improvement * norm.cdf(z) + sigma * norm.pdf(z)
return max(float(ei), 0.0)

def score(self, config: dict, config_vec: np.ndarray, fidelity: float) -> float:
incumbent = self.surrogate.best_score(min_fidelity=self.incumbent_fidelity)
mu, var = self.surrogate.predict(config_vec, fidelity)
ei = self._expected_improvement(mu, var, incumbent)
token_cost = max(float(self.token_cost_fn(config, fidelity)), 1e-9)
return ei / token_cost
215 changes: 215 additions & 0 deletions flagscale/inference/tuning/ecotune/optimizer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Callable, Dict, List, Optional

import numpy as np

from .acquisition import TokenAwareExpectedImprovement
from .search_space import SearchSpace
from .surrogate import MultiFidelityGPSurrogate


@dataclass
class Suggestion:
config: Dict[str, object]
fidelity: float
acquisition_score: float
promoted: bool


@dataclass
class EvaluationResult:
score: float
token_cost: float
metadata: Optional[dict] = None


@dataclass
class Observation:
config: Dict[str, object]
fidelity: float
score: float
token_cost: float
cumulative_tokens: float
promoted: bool


class EcoTuneOptimizer:
def __init__(
self,
search_space: SearchSpace,
token_cost_fn: Callable[[Dict[str, object], float], float],
total_budget: float,
fidelity_levels: Optional[List[float]] = None,
r_min: float = 0.05,
r_max: float = 1.0,
promotion_threshold: float = 0.05,
initial_design_size: int = 6,
candidates_per_step: int = 64,
random_seed: int = 42,
bootstrap_config: Optional[Dict[str, object]] = None,
bootstrap_fidelity: Optional[float] = None,
):
self.search_space = search_space
self.token_cost_fn = token_cost_fn
self.total_budget = float(total_budget)
self.remaining_budget = float(total_budget)
self.promotion_threshold = float(promotion_threshold)
self.initial_design_size = int(initial_design_size)
self.candidates_per_step = int(candidates_per_step)
self.r_min = float(r_min)
self.r_max = float(r_max)
self.fidelity_levels = fidelity_levels or list(np.linspace(self.r_min, self.r_max, 10))
self.rng = np.random.default_rng(random_seed)
self.surrogate = MultiFidelityGPSurrogate(n_dims=search_space.n_dims)
self.acquisition = TokenAwareExpectedImprovement(
surrogate=self.surrogate,
token_cost_fn=self.token_cost_fn,
incumbent_fidelity=self.r_max - 1e-8,
)
self.bootstrap_config = dict(bootstrap_config) if bootstrap_config is not None else None
self.bootstrap_fidelity = (
float(bootstrap_fidelity) if bootstrap_fidelity is not None else None
)
self.history: List[Observation] = []
self.best_score = -np.inf
self.best_config: Dict[str, object] = {}
self._pending: Optional[Suggestion] = None

def _affordable_fidelities(self, config: Dict[str, object]) -> List[float]:
return [
fidelity
for fidelity in self.fidelity_levels
if self.token_cost_fn(config, fidelity) <= self.remaining_budget
]

def _initial_suggestion(self) -> Suggestion:
config = self.search_space.sample(self.rng, n=1)[0]
affordable = self._affordable_fidelities(config)
if not affordable:
raise RuntimeError("No affordable fidelity remains within budget")
fidelity = float(affordable[min(len(self.history), len(affordable) - 1)])
return Suggestion(config=config, fidelity=fidelity, acquisition_score=0.0, promoted=False)

def _model_based_suggestion(self) -> Suggestion:
candidates = self.search_space.sample(self.rng, n=self.candidates_per_step)
best: Optional[Suggestion] = None

for config in candidates:
config_vec = self.search_space.to_vector(config)
affordable = self._affordable_fidelities(config)
if not affordable:
continue

scores = [
self.acquisition.score(config, config_vec, fidelity) for fidelity in affordable
]
score_at_rmax = -np.inf
promoted = False
if self.r_max in affordable:
score_at_rmax = self.acquisition.score(config, config_vec, self.r_max)
promoted = score_at_rmax > self.promotion_threshold

if promoted:
fidelity = self.r_max
acquisition_score = score_at_rmax
else:
best_idx = int(np.argmax(scores))
fidelity = float(affordable[best_idx])
acquisition_score = float(scores[best_idx])

suggestion = Suggestion(
config=config,
fidelity=fidelity,
acquisition_score=float(acquisition_score),
promoted=promoted,
)
if best is None or suggestion.acquisition_score > best.acquisition_score:
best = suggestion

if best is None:
raise RuntimeError("Unable to generate an affordable suggestion")
return best

def ask(self) -> Suggestion:
if self.remaining_budget <= 0:
raise RuntimeError("Token budget exhausted")
if self._pending is not None:
return self._pending

if self.bootstrap_config is not None and self.surrogate.num_observations == 0:
affordable = self._affordable_fidelities(self.bootstrap_config)
if not affordable:
raise RuntimeError("Bootstrap config is not affordable within budget")
if self.bootstrap_fidelity is None:
fidelity = affordable[0]
else:
fidelity = min(affordable, key=lambda x: abs(x - self.bootstrap_fidelity))
self._pending = Suggestion(
config=dict(self.bootstrap_config),
fidelity=float(fidelity),
acquisition_score=0.0,
promoted=False,
)
return self._pending

if self.surrogate.num_observations < self.initial_design_size:
self._pending = self._initial_suggestion()
else:
self._pending = self._model_based_suggestion()
return self._pending

def tell(self, result: EvaluationResult) -> None:
if self._pending is None:
raise RuntimeError("tell() called before ask()")
if result.token_cost > self.remaining_budget + 1e-8:
raise ValueError("Reported token cost exceeds remaining budget")
self._record_observation(
config=self._pending.config,
fidelity=self._pending.fidelity,
result=result,
promoted=bool(self._pending.promoted),
)
self._pending = None

def _record_observation(
self,
config: Dict[str, object],
fidelity: float,
result: EvaluationResult,
promoted: bool,
) -> None:
config_vec = self.search_space.to_vector(config)
self.surrogate.add_observation(config_vec, fidelity, result.score)
self.remaining_budget = max(0.0, self.remaining_budget - float(result.token_cost))
cumulative = self.total_budget - self.remaining_budget
if result.score > self.best_score:
self.best_score = float(result.score)
self.best_config = dict(config)
self.history.append(
Observation(
config=dict(config),
fidelity=float(fidelity),
score=float(result.score),
token_cost=float(result.token_cost),
cumulative_tokens=float(cumulative),
promoted=bool(promoted),
)
)

def should_stop(self, max_steps: Optional[int] = None) -> bool:
if self.remaining_budget <= 0:
return True
if max_steps is not None and len(self.history) >= max_steps:
return True
return False

def summary(self) -> dict:
return {
"total_budget": self.total_budget,
"remaining_budget": self.remaining_budget,
"num_trials": len(self.history),
"best_score": self.best_score,
"best_config": self.best_config,
}
63 changes: 63 additions & 0 deletions flagscale/inference/tuning/ecotune/search_space.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Dict, List, Sequence

import numpy as np


@dataclass(frozen=True)
class SearchDimension:
name: str
low: float
high: float
kind: str = "float"

def sample(self, rng: np.random.Generator) -> Any:
value = rng.uniform(self.low, self.high)
if self.kind == "int":
return int(round(value))
return float(value)

def denormalize(self, value: float) -> Any:
raw = self.low + float(value) * (self.high - self.low)
if self.kind == "int":
return int(round(raw))
return float(raw)

def normalize(self, value: Any) -> float:
return (float(value) - self.low) / (self.high - self.low + 1e-12)


class SearchSpace:
def __init__(self, dimensions: Sequence[SearchDimension]):
if not dimensions:
raise ValueError("SearchSpace requires at least one dimension")
self._dimensions = list(dimensions)

@property
def dimensions(self) -> List[SearchDimension]:
return list(self._dimensions)

@property
def n_dims(self) -> int:
return len(self._dimensions)

def sample(self, rng: np.random.Generator, n: int = 1) -> List[Dict[str, Any]]:
configs: List[Dict[str, Any]] = []
for _ in range(n):
configs.append({dim.name: dim.sample(rng) for dim in self._dimensions})
return configs

def to_vector(self, config: Dict[str, Any]) -> np.ndarray:
return np.asarray(
[dim.normalize(config[dim.name]) for dim in self._dimensions],
dtype=np.float64,
)

def from_vector(self, vector: np.ndarray) -> Dict[str, Any]:
vec = np.asarray(vector, dtype=np.float64)
return {
dim.name: dim.denormalize(np.clip(vec[idx], 0.0, 1.0))
for idx, dim in enumerate(self._dimensions)
}
Loading
Loading