diff --git a/src/hayhooks/server/pipelines/utils.py b/src/hayhooks/server/pipelines/utils.py index 1c8ab1a..eb2aa7e 100644 --- a/src/hayhooks/server/pipelines/utils.py +++ b/src/hayhooks/server/pipelines/utils.py @@ -36,26 +36,28 @@ def get_last_user_message(messages: List[Union[Message, Dict]]) -> Union[str, No return None +def send_message(message: str, streaming_callback: Callable[[StreamingChunk], None]): + if streaming_callback is not None: + streaming_callback(StreamingChunk(content=message)) -def find_streaming_component(pipeline: Union[Pipeline, AsyncPipeline]) -> Tuple[Component, str]: +def find_streaming_components(pipeline: Union[Pipeline, AsyncPipeline]) -> List[Tuple[Component, str]]: """ - Finds the component in the pipeline that supports streaming_callback + Finds all components in the pipeline that support streaming_callback Returns: - The first component that supports streaming + A list of tuples (component, component_name) for each component that supports streaming """ - streaming_component = None - streaming_component_name = "" + streaming_components = [] for name, component in pipeline.walk(): if hasattr(component, "streaming_callback"): log.trace(f"Streaming component found in '{name}' with type {type(component)}") - streaming_component = component - streaming_component_name = name - if not streaming_component: - raise ValueError("No streaming-capable component found in the pipeline") + streaming_components.append((component, name)) - return streaming_component, streaming_component_name + if not streaming_components: + raise ValueError("No streaming-capable components found in the pipeline") + + return streaming_components def _setup_streaming_callback_for_pipeline( @@ -72,16 +74,17 @@ def _setup_streaming_callback_for_pipeline( Returns: Updated pipeline run arguments """ - _, streaming_component_name = find_streaming_component(pipeline) + streaming_components = find_streaming_components(pipeline) - # Ensure component args exist in pipeline run args - if streaming_component_name not in pipeline_run_args: - pipeline_run_args[streaming_component_name] = {} + for streaming_component, streaming_component_name in streaming_components: + # Ensure component args exist in pipeline run args + if streaming_component_name not in pipeline_run_args: + pipeline_run_args[streaming_component_name] = {} - # Set the streaming callback on the component - streaming_component = pipeline.get_component(streaming_component_name) - assert hasattr(streaming_component, "streaming_callback") - streaming_component.streaming_callback = streaming_callback + # Set the streaming callback on the component + component = pipeline.get_component(streaming_component_name) + assert hasattr(component, "streaming_callback") + component.streaming_callback = streaming_callback return pipeline_run_args @@ -232,15 +235,20 @@ def _validate_async_streaming_support(pipeline: Union[Pipeline, AsyncPipeline]) Raises: ValueError: If the pipeline doesn't support async streaming """ - streaming_component, streaming_component_name = find_streaming_component(pipeline) + streaming_components = find_streaming_components(pipeline) + unsupported_components = [] + + for streaming_component, streaming_component_name in streaming_components: + # Check if the streaming component supports async streaming callbacks + if not hasattr(streaming_component, "run_async"): + component_type = type(streaming_component).__name__ + unsupported_components.append(f"Component '{streaming_component_name}' of type {component_type})") - # Check if the streaming component supports async streaming callbacks - # We check for run_async method as an indicator of async support - if not hasattr(streaming_component, "run_async"): - component_type = type(streaming_component).__name__ + if unsupported_components: + component_list = ", ".join(unsupported_components) raise ValueError( - f"Component '{streaming_component_name}' of type '{component_type}' seems to not support async streaming callbacks. " - f"Use the sync 'streaming_generator' function instead, or switch to a component that supports async streaming callbacks " + f"The following components do not seem to support async streaming callbacks: {component_list}. " + f"Use the sync 'streaming_generator' function instead, or switch to components that support async streaming callbacks " f"(e.g., OpenAIChatGenerator instead of OpenAIGenerator)." )