Skip to content

Conversation

@ChuckJonas
Copy link
Contributor

@ChuckJonas ChuckJonas commented Nov 4, 2025

WIP: submitted code in POC stage
Based on discussions from #3023

AdaptiveModel is a new model type that provides full control over model selection at runtime. Unlike FallbackModel which tries models sequentially, AdaptiveModel allows custom logic to select the next model based on rich context including attempts, exceptions, and agent dependencies.

Core API

class AdaptiveModel[AgentDepsT](Model):
    def __init__(
        self,
        models: Sequence[Model],
        selector: Callable[[AdaptiveContext[AgentDepsT]], Model | None] 
                  | Callable[[AdaptiveContext[AgentDepsT]], Awaitable[Model | None]],
        *,
        max_attempts: int | None = None,
    ):
        """
        Args:
            models: Pool of models to choose from
            selector: Sync or async function that selects the next model to try
            max_attempts: Maximum total attempts across all models (None = unlimited)
        """

Execution Flow

The AdaptiveModel automatically handles fallback and retry by:

  1. Calling selector(context) to get the next model to try
  2. Attempting the request with that model
  3. If the request succeeds, returning the result
  4. If the request fails:
    • Recording the attempt (model + exception) in context.attempts
    • Calling selector(context) again with updated context
    • If selector returns a Model, goto step 2 (retry/fallback)
    • If selector returns None, raise exception group with all failures
  5. If max_attempts is reached, stop and raise exception group

The selector has full control over retry/fallback logic:

  • Return the same model that failed → retry with same model
  • Return a different model → fallback to another model
  • Return None → stop trying
  • Inspect ctx.attempts[-1].exception to make decisions
  • Use time.sleep() or await asyncio.sleep() for backoff/waiting (selector can be sync or async)

Context

@dataclass
class AdaptiveContext[AgentDepsT]:
    """Context provided to the selector function."""
    
    run_context: RunContext[AgentDepsT] | None  # Access to agent dependencies (None for non-streaming)
    models: Sequence[Model]  # Available models
    attempts: list[AttemptResult]  # History of attempts in this request
    attempt_number: int  # Current attempt number (1-indexed)
    messages: list[ModelMessage]  # The original request
    model_settings: ModelSettings | None
    model_request_parameters: ModelRequestParameters

@dataclass
class AttemptResult:
    """Record of a single attempt."""
    model: Model
    exception: Exception | None
    timestamp: float
    duration: float  # seconds

Important Implementation Notes

Agent Dependencies Availability:

  • run_context (and thus ctx.run_context.deps) is only available in streaming mode (run_stream)
    • For non-streaming requests (run, run_sync), ctx.run_context will be None
    • This is due to the base Model.request() API not accepting a run_context parameter

Use Cases

1. Throttling with Timeout

Handle rate limiting by timing out models for 30 seconds after throttling errors.

import asyncio
import time

throttled_models = {}  # model_id -> timestamp

async def throttle_aware_selector(ctx: AdaptiveContext) -> Model | None:
    # Record throttling from last attempt
    if ctx.attempts:
        last = ctx.attempts[-1]
        if last.exception and 'throttl' in str(last.exception).lower():
            throttled_models[id(last.model)] = time.time()
    
    # Find first available model
    for model in ctx.models:
        model_id = id(model)
        if model_id in throttled_models:
            if time.time() - throttled_models[model_id] < 30:
                continue
            del throttled_models[model_id]
        return model
    
    # All throttled - wait for soonest available
    if throttled_models and ctx.attempt_number < 10:
        soonest = min(throttled_models.items(), key=lambda x: x[1])
        wait_time = 30 - (time.time() - soonest[1])
        if wait_time > 0:
            await asyncio.sleep(wait_time)
            del throttled_models[soonest[0]]
            return next(m for m in ctx.models if id(m) == soonest[0])
    
    return None

adaptive = AdaptiveModel(
    models=[primary, secondary, tertiary],
    selector=throttle_aware_selector,
    max_attempts=15
)

2. Load Balancing

Distribute requests evenly across multiple accounts/instances.

call_counts = {}  # model_id -> count

def round_robin_selector(ctx: AdaptiveContext) -> Model | None:
    if not ctx.attempts:
        # Use least-used model
        model = min(ctx.models, key=lambda m: call_counts.get(id(m), 0))
        call_counts[id(model)] = call_counts.get(id(model), 0) + 1
        return model
    
    # On retry, try next least-used
    failed = {id(a.model) for a in ctx.attempts}
    available = [m for m in ctx.models if id(m) not in failed]
    return min(available, key=lambda m: call_counts.get(id(m), 0)) if available else None

# Load balance across accounts
adaptive = AdaptiveModel(
    models=[
        OpenAIChatModel('gpt-4o', api_key=key1),
        OpenAIChatModel('gpt-4o', api_key=key2),
    ],
    selector=round_robin_selector
)

3. User Tier-Based Selection

Route to different models based on user subscription level.

@dataclass
class UserContext:
    tier: str  # 'free', 'pro', 'enterprise'
    monthly_tokens_used: int
    monthly_token_limit: int

def tier_based_selector(ctx: AdaptiveContext[UserContext]) -> Model | None:
    user = ctx.run_context.deps
    
    if not ctx.attempts:
        if user.tier == 'enterprise':
            return next((m for m in ctx.models if 'gpt-4o' in m.model_name), ctx.models[0])
        elif user.tier == 'pro':
            # Check usage limits
            if user.monthly_tokens_used < user.monthly_token_limit * 0.9:
                return next((m for m in ctx.models if 'gpt-4o-mini' in m.model_name), ctx.models[0])
            return next((m for m in ctx.models if 'gpt-3.5' in m.model_name), ctx.models[0])
        else:
            return next((m for m in ctx.models if 'gpt-3.5' in m.model_name), ctx.models[0])
    
    # Retry with next model
    tried = {id(a.model) for a in ctx.attempts}
    available = [m for m in ctx.models if id(m) not in tried]
    return available[0] if available else None

adaptive = AdaptiveModel(
    models=[gpt35, gpt4mini, gpt4],
    selector=tier_based_selector
)

agent = Agent(adaptive, deps_type=UserContext)
result = agent.run_sync(
    'Explain quantum computing',
    deps=UserContext(tier='pro', monthly_tokens_used=80000, monthly_token_limit=100000)
)

4. Extended Context Model Upgrade

Automatically upgrade to a long-context model when conversation exceeds a threshold.

def context_aware_selector(ctx: AdaptiveContext) -> Model | None:
    """Upgrade to long-context model when conversation gets large."""
    
    # Count messages in conversation
    message_count = len(ctx.messages)
    
    if not ctx.attempts:
        # First attempt - choose based on context size
        if message_count > 50:
            # Use long-context model for large conversations
            return next((m for m in ctx.models if 'claude-3-7-sonnet' in m.model_name), ctx.models[0])
        else:
            # Use standard model for normal conversations
            return next((m for m in ctx.models if 'gpt-4o-mini' in m.model_name), ctx.models[0])
    
    # On retry, try next available model
    tried = {id(a.model) for a in ctx.attempts}
    available = [m for m in ctx.models if id(m) not in tried]
    return available[0] if available else None

# Models sorted by context length capability
adaptive = AdaptiveModel(
    models=[
        OpenAIChatModel('gpt-4o-mini'),           # 128K context
        OpenAIChatModel('gpt-4o'),                # 128K context  
        AnthropicModel('claude-3-7-sonnet-latest'),  # 1M context
    ],
    selector=context_aware_selector
)

agent = Agent(adaptive)

# Short conversation uses cheap model
result1 = agent.run_sync('Hello')  # Uses gpt-4o-mini

# Long conversation automatically upgrades
messages = result1.new_messages()
for i in range(60):
    result = agent.run_sync(f'Message {i}', message_history=messages)
    messages = result.new_messages()
# Automatically switched to claude-3-7-sonnet due to message count

5. Cost-Optimized with Quality Fallback

Start with cheap models, upgrade if quality is insufficient.

@dataclass
class QualityTracker:
    quality_threshold: float = 0.8
    model_history: dict[str, list[float]] = field(default_factory=dict)  # model -> quality scores

def cost_quality_selector(ctx: AdaptiveContext[QualityTracker]) -> Model | None:
    tracker = ctx.run_context.deps
    
    if not ctx.attempts:
        # Check if cheap model historically meets quality threshold
        cheap_model = ctx.models[0]
        avg_quality = (
            sum(tracker.model_history.get(cheap_model.model_name, [1.0])) / 
            len(tracker.model_history.get(cheap_model.model_name, [1.0]))
        )
        
        if avg_quality >= tracker.quality_threshold:
            return cheap_model
        # Quality too low, start with better model
        return ctx.models[1] if len(ctx.models) > 1 else cheap_model
    
    # Upgrade on failure
    tried = {id(a.model) for a in ctx.attempts}
    available = [m for m in ctx.models if id(m) not in tried]
    return available[0] if available else None

adaptive = AdaptiveModel(
    models=[gpt35, gpt4mini, gpt4],  # Sorted by cost
    selector=cost_quality_selector
)

agent = Agent(adaptive, deps_type=QualityTracker)

6. Smart Retry with Exponential Backoff

Retry same model with backoff for transient errors, fallback for permanent errors.

import asyncio

async def exponential_backoff_selector(ctx: AdaptiveContext) -> Model | None:
    if not ctx.attempts:
        return ctx.models[0]
    
    last = ctx.attempts[-1]
    is_transient = (
        last.exception and 
        hasattr(last.exception, 'status_code') and 
        500 <= last.exception.status_code < 600
    )
    
    if is_transient and ctx.attempt_number <= 5:
        # Exponential backoff: 1s, 2s, 4s, 8s, 16s
        await asyncio.sleep(2 ** (ctx.attempt_number - 2))
        return last.model  # Retry same model
    
    # Try different model
    tried = {id(a.model) for a in ctx.attempts}
    available = [m for m in ctx.models if id(m) not in tried]
    return available[0] if available else None

adaptive = AdaptiveModel(
    models=[primary, backup],
    selector=exponential_backoff_selector
)

7. Account Quota Management

Rotate across accounts based on remaining quota.

@dataclass
class AccountPool:
    accounts: dict[str, dict]  # account_id -> {api_key, quota_remaining, reset_time}
    
    def get_available_account(self) -> str | None:
        now = datetime.now()
        for account_id, info in self.accounts.items():
            if now >= info['reset_time']:
                info['quota_remaining'] = info['quota_limit']
            if info['quota_remaining'] > 0:
                return account_id
        return None

def quota_rotation_selector(ctx: AdaptiveContext[AccountPool]) -> Model | None:
    pool = ctx.run_context.deps
    tried = {id(a.model) for a in ctx.attempts}
    
    account_id = pool.get_available_account()
    if not account_id:
        return None
    
    # Find model for this account that hasn't been tried
    return next(
        (m for m in ctx.models 
         if m.api_key == pool.accounts[account_id]['api_key'] and id(m) not in tried),
        None
    )

adaptive = AdaptiveModel(
    models=[
        OpenAIChatModel('gpt-4o', api_key=key1),
        OpenAIChatModel('gpt-4o', api_key=key2),
        OpenAIChatModel('gpt-4o', api_key=key3),
    ],
    selector=quota_rotation_selector
)

Key Benefits

  1. Full Control: Custom logic for model selection based on any criteria
  2. Stateful Logic: Maintain state across calls (throttle timers, usage counts, quality metrics)
  3. Smart Waiting: Wait for throttled models instead of giving up
  4. Load Distribution: Balance across identical models on different accounts
  5. User-Aware: Access agent dependencies for user-specific routing
  6. Cost Optimization: Dynamic model selection based on cost, quality, and usage
  7. Complex Retry: Implement exponential backoff, circuit breakers, etc.

Comparison with FallbackModel

Feature FallbackModel AdaptiveModel
Model Selection Sequential Custom logic
State Management External only External + internal
Wait/Retry Logic Not supported Full control
Load Balancing Not supported Supported
User Context Via callbacks Direct access via RunContext
Complexity Simple Flexible

AdaptiveModel complements FallbackModel by supporting sophisticated routing scenarios while FallbackModel remains the simpler choice for basic sequential fallback.

self,
messages: list[ModelMessage],
model_settings: ModelSettings | None,
model_request_parameters: ModelRequestParameters,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@DouweM is there a reason that run_context is available in request_stream but not request?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I added it to request_stream pre-v1 because we needed it for Temporal integration, and I really should've added it to request as well at the time, as we can't do so anymore now as it'd be a breaking change for users that have custom Model implementations 😬

Of course if we only support deps for request_stream, this AdaptiveModel will feel half-baked. Could we make it not depend on RunContext, and perhaps require the deps (or another context object) to be passed into the model explicitly?

Copy link
Contributor Author

@ChuckJonas ChuckJonas Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can't do so anymore now as it'd be a breaking change for users that have custom Model implementations

Does python not support optional params or signature overloads in a way thats backwards compatible? That's painful 😬

and perhaps require the deps (or another context object) to be passed into the model explicitly?

I'm trying to wrap my brain around the different use-cases where context matters and what we give up by not having the RunContext. I guess if the Model and Agent need to share dependencies, then those objects can just be setup on each request and passed into both.

Honestly, there are probably more use cases for global AdaptiveModel state vs run isolated run state.

Looking at the RunContext, the only other property we're potentially giving up is the usage, which could maybe be useful for upgrading/downgrading model based on input/output tokens. But maybe theres a simple workaround.

Copy link
Collaborator

@DouweM DouweM left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ChuckJonas Thanks Charlie, I like it!

self,
messages: list[ModelMessage],
model_settings: ModelSettings | None,
model_request_parameters: ModelRequestParameters,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I added it to request_stream pre-v1 because we needed it for Temporal integration, and I really should've added it to request as well at the time, as we can't do so anymore now as it'd be a breaking change for users that have custom Model implementations 😬

Of course if we only support deps for request_stream, this AdaptiveModel will feel half-baked. Could we make it not depend on RunContext, and perhaps require the deps (or another context object) to be passed into the model explicitly?

class AdaptiveModel(Model, Generic[AgentDepsT]):
"""A model that uses custom logic to select which model to try next.

Unlike FallbackModel which tries models sequentially, AdaptiveModel gives
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you try refactoring FallbackModel as subclass of AdaptiveModel, to prove this is flexible enough (and simplify and deduplicate FallbackModel)?

attempt_number += 1

# Check max attempts
if self._max_attempts is not None and attempt_number > self._max_attempts:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should reduce the duplication between request and request_stream

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the issues linked in #3303 will also apply this model, so I will fix it in FallbackModel before we merge this, and then we should make sure this also works properly with output modes.



@dataclass
class AttemptResult:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think just Attempt will be clear enough

exception: Exception | None
"""The exception raised by the model, if any."""

timestamp: float
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's store a full datetime

timestamp: float
"""Unix timestamp when the attempt was made."""

duration: float
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a store a timedelta? Don't feel strongly about this one

"""History of attempts in this request."""

attempt_number: int
"""Current attempt number (1-indexed)."""
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this always be len(attempts) + 1? In that case it could be a property, or be omitted entirely

# Call selector to get next model
model = await self._call_selector(context)

if model is None:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we allow the selector to return None if that will always raise an error? Couldn't the selector itself be required to return a Model, or itself raise an error if it can't?


# Try the selected model
start_time = time.time()
customized_params = model.customize_request_parameters(model_request_parameters)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we need to use prepare_request as we do in FallbackModel

@ChuckJonas
Copy link
Contributor Author

@DouweM sorry I wasn't super clear, but this was a really just quick POC for validating this abstraction supported difference use-cases. I hardly reviewed at the internals at all 😅 . Wanted to make sure you'd be open the the approach/contribution before putting too much effort into.

One thought I had was to allow the AdaptiveModel to have it's own generic ctx.state that would function similar to Agent deps. That would allow it to be less opinionated and we'd instead only track the most fundamental state internally + provide lifecycle hooks. Not sure there is really any advantage here over just simple inheritance (other than "composition over inheritance" as a design principle).

I'll take a proper pass on it as soon as soon as I have some free cycles. If there are any other issues you want to make sure this approach covers, send them over and I'll make sure they are covered!

@DouweM
Copy link
Collaborator

DouweM commented Nov 6, 2025

sorry I wasn't super clear, but this was a really just quick POC for validating this abstraction supported difference use-cases. I hardly reviewed at the internals at all 😅 . Wanted to make sure you'd be open the the approach/contribution before putting too much effort into.

@ChuckJonas I realized that, I just couldn't help myself 😄

One thought I had was to allow the AdaptiveModel to have it's own generic ctx.state that would function similar to Agent deps. That would allow it to be less opinionated and we'd instead only track the most fundamental state internally + provide lifecycle hooks. Not sure there is really any advantage here over just simple inheritance (other than "composition over inheritance" as a design principle).

It seems like people sometimes forget that they can just subclass our public classes, so generic state makes sense to me.

Copy link

@gnaryak gnaryak left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great to see this PR. You made my day.

- Inspect previous attempts via ctx.attempts
"""

models: Sequence[Model]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does the AdaptiveModel class have a models member? It seems to imply that the _selector function is choosing one from this sequence. What advantage does that give us? Could we make it optional? Why not just let the _selector function provide a model however it wants?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ya, I've been debating this internally. It's was mainly here because the design evolved organically from FallbackModel.

The biggest issue is having to know the models at initialization. Maybe there are instances where you actually want to init a model on the fly? (although I'm struggling to actually come up with one in practice)

What advantage does that give us?

The main things we lose by removing it:

  • it's potentially less "discoverable" (being able to introspect/reflect the underlying models). Not sure where this would actually be needed in practice.
  • some of the model properties would be less specific (IE model_name). This probably only really impacts observability.

Could we make it optional?

If it's not critical, I'd lean more for just removing it for overall consistency.

Basically this becomes a ModelProvider / Factory with some internal state tracking (arguably still too opinionated) and life cycle hooks (I've since added on_failure/success hooks).

I do think there is a fine line between being "general purpose" and having "no purpose" 😅

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, when I first started exploring our use case, I was initially hoping that agent might accept either a model factory or a model instance. So to me the idea of this being a model factory sounds just fine. A model factory in model's clothing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants