From 161fcc99f3ecfedbe3a0caac670c86fa0f54493b Mon Sep 17 00:00:00 2001 From: Edward Funnekotter Date: Thu, 25 Jul 2024 09:16:04 -0400 Subject: [PATCH] Two changes: added file_output component and a cleaner way for a component to discard a message (#22) * Added file_output component * Fixed timer_input component issue that caused the timer interval to shrink over time --- .github/workflows/ci.yml | 135 +++--------------- .github/workflows/release.yaml | 78 ++-------- docs/components/file_output.md | 29 ++++ docs/components/index.md | 1 + .../langchain_chat_model_with_history.md | 2 + docs/components/timer_input.md | 6 +- pyproject.toml | 16 --- src/solace_ai_connector/common/message.py | 7 + .../components/component_base.py | 11 +- .../langchain_chat_model_with_history.py | 59 ++++++-- .../components/inputs_outputs/file_output.py | 57 ++++++++ .../components/inputs_outputs/timer_input.py | 4 +- 12 files changed, 190 insertions(+), 215 deletions(-) create mode 100644 docs/components/file_output.md create mode 100644 src/solace_ai_connector/components/inputs_outputs/file_output.py diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 00e94ee0..21e2b8b7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -9,61 +9,35 @@ on: permissions: id-token: write checks: write - issues: read pull-requests: write + contents: write jobs: - test: + ci: + uses: SolaceDev/solace-public-workflows/.github/workflows/hatch_ci.yml@v1.0.0 + secrets: + SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} + SONAR_HOST_URL: ${{ vars.SONAR_HOST_URL }} + structure-test: + name: Test Docker Image Structure runs-on: ubuntu-latest - env: - HATCH_CACHE_DIR: ${{ github.workspace }}/.hatch_cache - HATCH_DATA_DIR: ${{ github.workspace }}/.hatch_data - steps: - - uses: actions/checkout@v4 + - name: Checkout + uses: actions/checkout@v4 with: fetch-depth: 0 + ssh-key: ${{ secrets.COMMIT_KEY }} - - name: Install Hatch - uses: pypa/hatch@install - - - name: Restore Hatch Directory - uses: actions/cache/restore@v4 - id: cache-restore - with: - path: | - ${{ env.HATCH_CACHE_DIR }} - ${{ env.HATCH_DATA_DIR }} - key: ${{ runner.os }}-hatch-${{ hashFiles('pyproject.toml','requirements.txt') }} - - - name: Install Dependencies - if: steps.cache-restore.outputs.cache-hit != 'true' - run: | - hatch python install 3.8 3.12 - - - name: Install Dependencies - if: steps.cache-restore.outputs.cache-hit != 'true' - run: | - hatch env create test - - - name: Cache Hatch Directory - uses: actions/cache/save@v4 - if: steps.cache-restore.outputs.cache-hit != 'true' - id: cache-hatch - with: - path: | - ${{ env.HATCH_CACHE_DIR }} - ${{ env.HATCH_DATA_DIR }} - key: ${{ runner.os }}-hatch-${{ hashFiles('pyproject.toml','requirements.txt') }} + - name: Set up Hatch + uses: SolaceDev/solace-public-workflows/.github/actions/hatch-setup@v1.0.0 - - name: Set up Docker Buildx + - name: Set Up Docker Buildx id: builder uses: docker/setup-buildx-action@v3 - name: Prepare env file run: | cp .env_template .env - shell: bash - name: Build Docker Image uses: docker/build-push-action@v6 @@ -74,84 +48,7 @@ jobs: builder: ${{ steps.builder.outputs.name }} load: true - - name: Run Lint - continue-on-error: true - run: | - hatch run lint:ruff check -o lint.json --output-format json - shell: bash - - - name: Run Structured Tests - run: | - hatch run +py=312 test:make structure-test + - name: Run Structure Tests shell: bash - - - name: Run Unit Tests - shell: bash - run: | - hatch test --cover --all --parallel --junitxml=junit.xml - - - name: Combine Coverage Reports - continue-on-error: true run: | - hatch run +py=312 test:coverage combine - shell: bash - - - name: Report coverage - run: | - hatch run +py=312 test:coverage xml - shell: bash - - - name: SonarQube Scan - if: always() && github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name == github.repository - uses: sonarsource/sonarqube-scan-action@v2.2.0 - env: - SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} - SONAR_HOST_URL: ${{ vars.SONAR_HOST_URL }} - with: - args: > - -Dsonar.tests=tests/ - -Dsonar.verbose=true - -Dsonar.sources=src/ - -Dsonar.projectKey=${{github.repository_owner}}_${{github.event.repository.name}} - -Dsonar.python.coverage.reportPaths=coverage.xml - -Dsonar.python.ruff.reportPaths=lint.json - - - name: SonarQube Quality Gate check - id: sonarqube-quality-gate-check - if: always() && github.event_name != 'pull_request' || github.event.pull_request.head.repo.full_name == github.repository - uses: sonarsource/sonarqube-quality-gate-action@master - env: - SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} - SONAR_HOST_URL: ${{ vars.SONAR_HOST_URL }} - - # Build and verify packages - - name: Build - run: hatch build - - - name: Verify Packages - run: | - ls dist/*.tar.gz | hatch run +py=312 test:xargs -n1 twine check - ls dist/*.whl | hatch run +py=312 test:xargs -n1 twine check - shell: bash - - - name: Surface failing tests - if: always() - uses: pmeier/pytest-results-action@main - with: - # A list of JUnit XML files, directories containing the former, and wildcard - # patterns to process. - # See @actions/glob for supported patterns. - path: junit.xml - - # (Optional) Add a summary of the results at the top of the report - summary: true - - # (Optional) Select which results should be included in the report. - # Follows the same syntax as `pytest -r` - display-options: fEX - - # (Optional) Fail the workflow if no JUnit XML was found. - fail-on-empty: true - - # (Optional) Title of the test results section in the workflow summary - title: Unit Test results + hatch run make structure-test diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 79c1343f..f4ab1235 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -7,71 +7,19 @@ on: required: true description: "Version bump type" options: - - patch - - minor - - major + - patch + - minor + - major + +permissions: + id-token: write + checks: write jobs: release: - name: Release - timeout-minutes: 20 - runs-on: ubuntu-latest - environment: - name: pypi - url: https://pypi.org/p/solace_ai_connector - permissions: - id-token: write - contents: write - - steps: - - name: Checkout - uses: actions/checkout@v4 - with: - fetch-depth: 0 - ssh-key: ${{ secrets.COMMIT_KEY }} - - - name: Set up Python - uses: actions/setup-python@v5 - with: - python-version: '3.x' - - - name: Install hatch - run: | - pip install --upgrade pip - pip install hatch - - - name: Get Current Version - run: | - CURRENT_VERSION=$(hatch version) - echo "CURRENT_VERSION=${CURRENT_VERSION}" >> $GITHUB_ENV - - - name: Fail if the current version doesn't exist - if: env.CURRENT_VERSION == '' - run: exit 1 - - - name: Build project for distribution - run: hatch build - - - name: Publish package distributions to PyPI - uses: pypa/gh-action-pypi-publish@release/v1 - - - name: Create Release - uses: ncipollo/release-action@v1 - with: - artifacts: "dist/*.whl" - makeLatest: true - generateReleaseNotes: true - tag: ${{ env.CURRENT_VERSION }} - - - name: Bump Version - run: | - hatch version "${{ github.event.inputs.version }}" - NEW_VERSION=$(hatch version) - echo "NEW_VERSION=${NEW_VERSION}" >> $GITHUB_ENV - - - name: Commit new version - run: | - git config --local user.email "action@github.com" - git config --local user.name "GitHub Action" - git commit -a -m "[ci skip] Bump version to $NEW_VERSION" - git push + uses: SolaceDev/solace-public-workflows/.github/workflows/hatch_release_pypi.yml@v1.0.0 + with: + version: ${{ github.event.inputs.version }} + pypi-project: solace-ai-connector + secrets: + COMMIT_KEY: ${{ secrets.COMMIT_KEY }} diff --git a/docs/components/file_output.md b/docs/components/file_output.md new file mode 100644 index 00000000..b0d5d2e8 --- /dev/null +++ b/docs/components/file_output.md @@ -0,0 +1,29 @@ +# FileOutput + +File output component + +## Configuration Parameters + +```yaml +component_name: +component_module: file_output +component_config: +``` + +No configuration parameters + + +## Component Input Schema + +``` +{ + content: , + file_path: , + mode: +} +``` +| Field | Required | Description | +| --- | --- | --- | +| content | True | | +| file_path | True | The path to the file to write to | +| mode | False | The mode to open the file in: w (write), a (append). Default is w. | diff --git a/docs/components/index.md b/docs/components/index.md index 0fa9b6d5..a236f014 100644 --- a/docs/components/index.md +++ b/docs/components/index.md @@ -7,6 +7,7 @@ | [broker_output](broker_output.md) | Connect to a messaging broker and send messages to it. Note that this component requires that the data is transformed into the input schema. | | [delay](delay.md) | A simple component that simply passes the input to the output, but with a configurable delay. | | [error_input](error_input.md) | Receive processing errors from the Solace AI Event Connector. Note that the component_input configuration is ignored. This component should be used to create a flow that handles errors from other flows. | +| [file_output](file_output.md) | File output component | | [iterate](iterate.md) | Take a single message that is a list and output each item in that list as a separate message | | [langchain_chat_model](langchain_chat_model.md) | Provide access to all the LangChain chat models via configuration | | [langchain_chat_model_with_history](langchain_chat_model_with_history.md) | A chat model based on LangChain that includes keeping per-session history of the conversation. Note that this component will only take the first system message and the first human message in the messages array. | diff --git a/docs/components/langchain_chat_model_with_history.md b/docs/components/langchain_chat_model_with_history.md index 04ef74b1..100e4298 100644 --- a/docs/components/langchain_chat_model_with_history.md +++ b/docs/components/langchain_chat_model_with_history.md @@ -21,6 +21,7 @@ component_config: stream_to_flow: llm_mode: stream_batch_size: + set_response_uuid_in_user_properties: ``` | Parameter | Required | Default | Description | @@ -38,6 +39,7 @@ component_config: | stream_to_flow | False | | Name the flow to stream the output to - this must be configured for llm_mode='stream'. | | llm_mode | False | | The mode for streaming results: 'sync' or 'stream'. 'stream' will just stream the results to the named flow. 'none' will wait for the full response. | | stream_batch_size | False | 15 | The minimum number of words in a single streaming result. Default: 15. | +| set_response_uuid_in_user_properties | False | False | Whether to set the response_uuid in the user_properties of the input_message. This will allow other components to correlate streaming chunks with the full response. | ## Component Input Schema diff --git a/docs/components/timer_input.md b/docs/components/timer_input.md index 0b35cbc1..f3ac656e 100644 --- a/docs/components/timer_input.md +++ b/docs/components/timer_input.md @@ -22,5 +22,9 @@ component_config: ## Component Output Schema ``` - +<<<<<<< HEAD + +======= + +>>>>>>> origin/main ``` diff --git a/pyproject.toml b/pyproject.toml index e3154b17..7c1f63a9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -42,22 +42,6 @@ packages = ["src/solace_ai_connector"] [tool.hatch.version] path = "src/solace_ai_connector/__init__.py" -[tool.hatch.envs.test] -dependencies = [ - "pytest>=8.2.2", - "coverage>=7.5.4", - "twine>=5.1.1", -] - -[tool.hatch.envs.lint] -detached = true -dependencies = [ - "ruff>=0.5.0", -] - [tool.ruff] lint.select = ["E4", "E7", "E9", "F"] lint.ignore = ["F401", "E731"] - -[[tool.hatch.envs.test.matrix]] -python = ["38", "312"] diff --git a/src/solace_ai_connector/common/message.py b/src/solace_ai_connector/common/message.py index 3182d51f..73c0fbf6 100644 --- a/src/solace_ai_connector/common/message.py +++ b/src/solace_ai_connector/common/message.py @@ -28,6 +28,7 @@ def __init__(self, payload=None, topic=None, user_properties=None): # : # Where: # is one of the following: + # input - Object containing the payload, topic, and user_properties # input.payload - The payload of the message # input.topic - The topic of the message as a string # input.topic_levels - The topic of the message as a list of each level of the topic @@ -99,6 +100,12 @@ def get_data_object( ): data_type = expression.split(":")[0] + if data_type == "input": + return { + "payload": self.payload, + "topic": self.topic, + "user_properties": self.user_properties, + } if data_type == "input.payload": return self.payload if data_type == "input.topic": diff --git a/src/solace_ai_connector/components/component_base.py b/src/solace_ai_connector/components/component_base.py index aef1aa36..1783a073 100644 --- a/src/solace_ai_connector/components/component_base.py +++ b/src/solace_ai_connector/components/component_base.py @@ -46,6 +46,7 @@ def __init__(self, module_info, **kwargs): self.need_acknowledgement = False self.stop_thread_event = threading.Event() self.current_message = None + self.current_message_has_been_discarded = False self.log_identifier = f"[{self.instance_name}.{self.flow_name}.{self.name}] " @@ -159,9 +160,13 @@ def process_message(self, message): self.trace_data(data) # Invoke the component + self.current_message_has_been_discarded = False result = self.invoke(message, data) - if result is not None: + if self.current_message_has_been_discarded: + # Call the message acknowledgements + message.call_acknowledgements() + elif result is not None: # Do all the things we need to do after invoking the component # Note that there are times where we don't want to # send the message to the next component @@ -193,6 +198,10 @@ def process_post_invoke(self, result, message): self.current_message = message self.send_message(message) + def discard_current_message(self): + # If the message is to be discarded, we need to acknowledge any previous components + self.current_message_has_been_discarded = True + def get_acknowledgement_callback(self): # This should be overridden by the component if it needs to acknowledge messages return None diff --git a/src/solace_ai_connector/components/general/langchain/langchain_chat_model_with_history.py b/src/solace_ai_connector/components/general/langchain/langchain_chat_model_with_history.py index 02ea5962..e9988db5 100644 --- a/src/solace_ai_connector/components/general/langchain/langchain_chat_model_with_history.py +++ b/src/solace_ai_connector/components/general/langchain/langchain_chat_model_with_history.py @@ -3,6 +3,7 @@ import threading from collections import namedtuple from copy import deepcopy +from uuid import uuid4 from langchain_core.chat_history import BaseChatMessageHistory from langchain_core.runnables.history import RunnableWithMessageHistory @@ -86,6 +87,13 @@ "description": "The minimum number of words in a single streaming result. Default: 15.", "default": 15, }, + { + "name": "set_response_uuid_in_user_properties", + "required": False, + "description": "Whether to set the response_uuid in the user_properties of the input_message. This will allow other components to correlate streaming chunks with the full response.", + "default": False, + "type": "boolean", + }, ] ) info["input_schema"]["properties"]["session_id"] = { @@ -114,6 +122,9 @@ def __init__(self, **kwargs): self.stream_to_flow = self.get_config("stream_to_flow", "") self.llm_mode = self.get_config("llm_mode", "none") self.stream_batch_size = self.get_config("stream_batch_size", 15) + self.set_response_uuid_in_user_properties = self.get_config( + "set_response_uuid_in_user_properties", False + ) def invoke_model( self, input_message, messages, session_id=None, clear_history=False @@ -161,6 +172,8 @@ def invoke_model( aggregate_result = "" current_batch = "" + response_uuid = str(uuid4()) + first_chunk = True for chunk in runnable.stream( {"input": human_message}, config={ @@ -172,25 +185,50 @@ def invoke_model( if len(current_batch.split()) >= self.stream_batch_size: if self.stream_to_flow: self.send_streaming_message( - input_message, current_batch, aggregate_result + input_message, + current_batch, + aggregate_result, + response_uuid, + first_chunk, ) current_batch = "" + first_chunk = False - if current_batch: - if self.stream_to_flow: - self.send_streaming_message( - input_message, current_batch, aggregate_result - ) + if self.stream_to_flow: + self.send_streaming_message( + input_message, + current_batch, + aggregate_result, + response_uuid, + first_chunk, + True, + ) - result = namedtuple("Result", ["content"])(aggregate_result) + result = namedtuple("Result", ["content", "uuid"])( + aggregate_result, response_uuid + ) self.prune_large_message_from_history(session_id) return result - def send_streaming_message(self, input_message, chunk, aggregate_result): + def send_streaming_message( + self, + input_message, + chunk, + aggregate_result, + response_uuid, + first_chunk=False, + last_chunk=False, + ): message = Message( - payload={"chunk": chunk, "aggregate_result": aggregate_result}, + payload={ + "chunk": chunk, + "aggregate_result": aggregate_result, + "response_uuid": response_uuid, + "first_chunk": first_chunk, + "last_chunk": last_chunk, + }, user_properties=input_message.get_user_properties(), ) self.send_to_flow(self.stream_to_flow, message) @@ -205,9 +243,6 @@ def create_history(self): ) config = self.get_config("history_config", {}) history = self.create_component(config, history_class) - # memory = ConversationTokenBufferMemory( - # chat_memory=history, llm=self.component, max_token_limit=history_max_tokens - # ) return history def get_history(self, session_id: str) -> BaseChatMessageHistory: diff --git a/src/solace_ai_connector/components/inputs_outputs/file_output.py b/src/solace_ai_connector/components/inputs_outputs/file_output.py new file mode 100644 index 00000000..f25f2474 --- /dev/null +++ b/src/solace_ai_connector/components/inputs_outputs/file_output.py @@ -0,0 +1,57 @@ +# An output component to write to a file +import pprint + +from ..component_base import ComponentBase + +info = { + "class_name": "FileOutput", + "description": "File output component", + "config_parameters": [], + "input_schema": { + "type": "object", + "properties": { + "content": { + "type": "string", + }, + "file_path": { + "description": "The path to the file to write to", + "type": "string", + }, + "mode": { + "description": "The mode to open the file in: w (write), a (append). Default is w.", + "type": "string", + "default": "w", + }, + }, + "required": ["content", "file_path"], + }, +} + + +class FileOutput(ComponentBase): + def __init__(self, **kwargs): + super().__init__(info, **kwargs) + + def invoke(self, message, data): + content = data["content"] + file_path = data["file_path"] + mode = data.get("mode", "w") + + if not file_path: + raise ValueError( + f"file_path is required for file_output component. {self.log_identifier}" + ) + + if mode not in ["w", "a"]: + raise ValueError( + f"mode must be either 'w' (write) or 'a' (append). {self.log_identifier}" + ) + + if content: + with open(file_path, mode, encoding="utf-8") as f: + if isinstance(content, str): + f.write(content) + else: + pprint.pprint(content, stream=f, width=160) + + return data diff --git a/src/solace_ai_connector/components/inputs_outputs/timer_input.py b/src/solace_ai_connector/components/inputs_outputs/timer_input.py index b9542ff9..756d44f3 100644 --- a/src/solace_ai_connector/components/inputs_outputs/timer_input.py +++ b/src/solace_ai_connector/components/inputs_outputs/timer_input.py @@ -29,7 +29,9 @@ "required": False, }, ], - "output_schema": "any", + "output_schema": { + "type": "None", + }, }