Skip to content

Conversation

@Pouyanpi
Copy link
Collaborator

@Pouyanpi Pouyanpi commented Nov 18, 2025

Description

Adds async streaming support to the patched ChatNVIDIA provider, enabling stream_async() to properly stream with NIM models. Prior to this fix, stream_async() with the NIM engine would block for the complete generation and yield all content at once, rather than streaming incrementally.

Problem

Before this fix, async streaming tests would fail because chunks weren't streamed incrementally:

async def test_ttft(engine, model):
    config = RailsConfig.from_content(yaml_content=f"""
models:
  - type: main
    engine: nim
    model: {model}
streaming: True
""")
    rails = LLMRails(config)
    chunk_times = [time.time()]
    async for chunk in rails.stream_async(messages=[...]):
        chunk_times.append(time.time())

    ttft = chunk_times[1] - chunk_times[0]  # Time to first token
    total_time = chunk_times[-1] - chunk_times[0]

    # this assertion fails without the fix:
    assert ttft < (total_time / 2), "TTFT should be less than half of total time"

for example:

await test_ttft(model="meta/llama-3.3-70b-instruct", engine="nim")

Tested it on all the supported versions:

  Version         TTFT (s)     Total Time (s)
 ------------------------------------------
 0.3.1           0.473        2.875
 0.3.2           0.498        2.797
 0.3.3           0.476        2.745
 0.3.4           0.450        2.507
 0.3.5           0.489        2.760
 0.3.6           0.461        2.787
 0.3.7           0.444        2.886
 0.3.8           0.432        2.831
 0.3.9           0.479        2.794
 0.3.10          0.440        2.426
 0.3.11          0.433        2.909
 0.3.12          0.471        2.692
 0.3.13          0.455        2.603
 0.3.14          0.474        2.825
 0.3.15          0.444        2.488
 0.3.16          0.473        2.645
 0.3.17          0.432        2.857
 0.3.18          0.473        3.438
 0.3.19          0.458        2.824

prior to the fix

  Version         Error Type                Error Message
  --------------------------------------------------------------------------------
  Version         Error Type                Error Message
  --------------------------------------------------------------------------------
  0.3.0           AssertionError            TTFT (3.1635499000549316) should be l...
  0.3.1           AssertionError            TTFT (2.7374980449676514) should be l...
  0.3.2           AssertionError            TTFT (2.588848829269409) should be le...
  0.3.3           AssertionError            TTFT (2.7798001766204834) should be l...
  0.3.4           AssertionError            TTFT (2.870666980743408) should be le...
  0.3.5           AssertionError            TTFT (2.900015115737915) should be le...
  0.3.6           AssertionError            TTFT (3.027787923812866) should be le...
  0.3.7           AssertionError            TTFT (2.741342782974243) should be le...
  0.3.8           AssertionError            TTFT (2.8514771461486816) should be l...
  0.3.9           AssertionError            TTFT (3.0388290882110596) should be l...
  0.3.10          AssertionError            TTFT (3.1536900997161865) should be l...
  0.3.11          AssertionError            TTFT (2.754746913909912) should be le...
  0.3.12          AssertionError            TTFT (2.7870500087738037) should be l...
  0.3.13          AssertionError            TTFT (2.575707197189331) should be le..
  0.3.14          AssertionError            TTFT (2.9357969760894775) should be l...
  0.3.15          AssertionError            TTFT (2.972874879837036) should be le...
  0.3.16          AssertionError            TTFT (2.6601181030273438) should be l...
  0.3.17          AssertionError            TTFT (3.1566638946533203) should be l...
  0.3.18          AssertionError            TTFT (3.070240020751953) should be le...
  0.3.19          RuntimeError              list index out of range

in 0.3.19 async api support is added to ChatNVIDIA, that's why we are getting a different error. This is fixed too.

LIVE_TEST_MODE=1 poetry run pytest tests/llm_providers/test_langchain_nvidia_ai_endpoints_patch.py -v

ensure you have langchain-nvidia-ai-endpoints installed and NVIDIA_API_KEY set.

@Pouyanpi Pouyanpi force-pushed the feat/chatnvidia-async-streaming branch 2 times, most recently from f06d0d5 to b7e314b Compare November 18, 2025 13:34
@Pouyanpi Pouyanpi changed the base branch from revert/chatnvidia-custom-headers to develop November 18, 2025 13:34
Enables stream_async() to work with ChatNVIDIA/NIM models by
implementing async streaming decorator and _agenerate method. Prior to
this fix, stream_async() would fail with NIM engine configurations.
@Pouyanpi Pouyanpi force-pushed the feat/chatnvidia-async-streaming branch from b7e314b to 19ddac4 Compare November 18, 2025 13:43
Copy link
Collaborator

@tgasser-nv tgasser-nv left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for fixing this. I'm really surprised streaming with the nim engine type has been broken the whole time (unless I misunderstood?). here are a couple of followup actions:

  1. Before merging can you run this yourself locally (using curl or similar) to make sure the chunks stream correctly after the fix?
  2. How was this not caught in QA in a streaming test up until now? We need a test in QA that fails prior to this fix, and passes after the fix. That will prevent regressions.
  3. What is the guidance on LIVE_TEST_MODE? Should we run this as part of a test plan for each PR? I usually just run pytest tests -q but don't enable LIVE_TEST_MODE

@Pouyanpi Pouyanpi added this to the v0.19.0 milestone Nov 19, 2025
@codecov
Copy link

codecov bot commented Nov 19, 2025

Codecov Report

✅ All modified and coverable lines are covered by tests.

📢 Thoughts on this report? Let us know!

@greptile-apps
Copy link
Contributor

greptile-apps bot commented Nov 19, 2025

Greptile Summary

  • Added async streaming support by implementing async_stream_decorator and overriding _agenerate to enable incremental token streaming for NIM models
  • Comprehensive test suite validates streaming behavior, TTFT metrics, and integration with LLMRails across multiple langchain-nvidia-ai-endpoints versions

Confidence Score: 4/5

  • This PR is safe to merge with minor parameter inconsistency to address
  • The implementation correctly mirrors the sync streaming decorator pattern and adds comprehensive test coverage. The only issue is a parameter inconsistency between sync and async methods that could cause issues if callers expect uniform APIs.
  • Pay attention to _langchain_nvidia_ai_endpoints_patch.py - the async _agenerate method should include the callbacks parameter for API consistency with the sync _generate method

Important Files Changed

Filename Overview
nemoguardrails/llm/providers/_langchain_nvidia_ai_endpoints_patch.py Added async streaming support via new async_stream_decorator and _agenerate method, plus callbacks parameter to sync _generate for API consistency

Sequence Diagram

sequenceDiagram
    participant User
    participant LLMRails
    participant ChatNVIDIA
    participant async_stream_decorator
    participant _astream
    participant agenerate_from_stream
    participant NIM

    User->>LLMRails: "stream_async(messages)"
    LLMRails->>ChatNVIDIA: "_agenerate(messages, streaming=True)"
    ChatNVIDIA->>async_stream_decorator: "wrapper called"
    async_stream_decorator->>async_stream_decorator: "check should_stream=True"
    async_stream_decorator->>_astream: "self._astream(messages)"
    _astream->>NIM: "send request"
    loop For each token
        NIM-->>_astream: "yield chunk"
        _astream-->>async_stream_decorator: "yield chunk"
    end
    async_stream_decorator->>agenerate_from_stream: "await agenerate_from_stream(stream_iter)"
    agenerate_from_stream->>agenerate_from_stream: "aggregate chunks"
    agenerate_from_stream-->>async_stream_decorator: "ChatResult"
    async_stream_decorator-->>ChatNVIDIA: "ChatResult"
    ChatNVIDIA-->>LLMRails: "ChatResult"
    LLMRails-->>User: "streamed response"
Loading

Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 files reviewed, no comments

Edit Code Review Agent Settings | Greptile
React with 👍 or 👎 to share your feedback on this new summary format

@Pouyanpi Pouyanpi force-pushed the feat/chatnvidia-async-streaming branch from 8a2507b to 71f6e7c Compare November 19, 2025 10:59
Copy link
Contributor

@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 files reviewed, 1 comment

Edit Code Review Agent Settings | Greptile
React with 👍 or 👎 to share your feedback on this new summary format

Comment on lines +149 to 158
async def _agenerate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> ChatResult:
return await super()._agenerate(
messages=messages, stop=stop, run_manager=run_manager, **kwargs
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: Async method _agenerate missing callbacks parameter that was added to sync _generate (line 137). This inconsistency may cause issues if callers expect uniform API.

Suggested change
async def _agenerate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> ChatResult:
return await super()._agenerate(
messages=messages, stop=stop, run_manager=run_manager, **kwargs
)
@async_stream_decorator
async def _agenerate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
callbacks: Callbacks = None,
**kwargs: Any,
) -> ChatResult:
return await super()._agenerate(
messages=messages, stop=stop, run_manager=run_manager, callbacks=callbacks, **kwargs
)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants