diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 00000000..425048ff --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,156 @@ +name: CI +on: + push: + branches: + - main + pull_request: + types: [opened, synchronize] + +permissions: + id-token: write + checks: write + issues: read + pull-requests: write + +jobs: + test: + runs-on: ubuntu-latest + env: + HATCH_CACHE_DIR: ${{ github.workspace }}/.hatch_cache + HATCH_DATA_DIR: ${{ github.workspace }}/.hatch_data + + steps: + - uses: actions/checkout@v4 + with: + fetch-depth: 0 + + - name: Install Hatch + uses: pypa/hatch@install + + - name: Restore Hatch Directory + uses: actions/cache/restore@v4 + id: cache-restore + with: + path: | + ${{ env.HATCH_CACHE_DIR }} + ${{ env.HATCH_DATA_DIR }} + key: ${{ runner.os }}-hatch-${{ hashFiles('pyproject.toml','requirements.txt') }} + + - name: Install Dependencies + if: steps.cache-restore.outputs.cache-hit != 'true' + run: | + hatch python install 3.8 3.12 + + - name: Install Dependencies + if: steps.cache-restore.outputs.cache-hit != 'true' + run: | + hatch env create test + + - name: Cache Hatch Directory + uses: actions/cache/save@v4 + if: steps.cache-restore.outputs.cache-hit != 'true' + id: cache-hatch + with: + path: | + ${{ env.HATCH_CACHE_DIR }} + ${{ env.HATCH_DATA_DIR }} + key: ${{ runner.os }}-hatch-${{ hashFiles('pyproject.toml','requirements.txt') }} + + - name: Set up Docker Buildx + id: builder + uses: docker/setup-buildx-action@v3 + + - name: Prepare env file + run: | + cp .env_template .env + shell: bash + + - name: Build Docker Image + uses: docker/build-push-action@v6 + with: + push: false + tags: solace/solace-ai-connector:local + platforms: linux/amd64 + builder: ${{ steps.builder.outputs.name }} + load: true + + - name: Run Lint + continue-on-error: true + run: | + hatch run +py=312 lint:ruff check -o lint.json --output-format json ./src ./tests + shell: bash + + - name: Run Structured Tests + run: | + hatch run +py=312 test:make structure-test + shell: bash + + - name: Run Unit Tests + shell: bash + run: | + hatch test --cover --all --parallel --junitxml=junit.xml + + - name: Combine Coverage Reports + continue-on-error: true + run: | + hatch run +py=312 test:coverage combine + shell: bash + + - name: Report coverage + run: | + hatch run +py=312 test:coverage xml + shell: bash + + - name: SonarQube Scan + if: always() + uses: sonarsource/sonarqube-scan-action@v2.2.0 + env: + SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} + SONAR_HOST_URL: ${{ vars.SONAR_HOST_URL }} + with: + args: > + -Dsonar.tests=tests/ + -Dsonar.verbose=true + -Dsonar.sources=src/ + -Dsonar.projectKey=${{github.repository_owner}}_${{github.event.repository.name}} + -Dsonar.python.coverage.reportPaths=coverage.xml + -Dsonar.python.ruff.reportPaths=lint.json + + - name: SonarQube Quality Gate check + id: sonarqube-quality-gate-check + uses: sonarsource/sonarqube-quality-gate-action@master + env: + SONAR_TOKEN: ${{ secrets.SONAR_TOKEN }} + SONAR_HOST_URL: ${{ vars.SONAR_HOST_URL }} + + # Build and verify packages + - name: Build + run: hatch build + + - name: Verify Packages + run: | + ls dist/*.tar.gz | hatch run +py=312 test:xargs -n1 twine check + ls dist/*.whl | hatch run +py=312 test:xargs -n1 twine check + shell: bash + + - name: Surface failing tests + if: always() + uses: pmeier/pytest-results-action@main + with: + # A list of JUnit XML files, directories containing the former, and wildcard + # patterns to process. + # See @actions/glob for supported patterns. + path: junit.xml + + # (Optional) Add a summary of the results at the top of the report + summary: true + + # (Optional) Select which results should be included in the report. + # Follows the same syntax as `pytest -r` + display-options: fEX + + # (Optional) Fail the workflow if no JUnit XML was found. + fail-on-empty: true + + # (Optional) Title of the test results section in the workflow summary + title: Unit Test results diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml new file mode 100644 index 00000000..f0b98f25 --- /dev/null +++ b/.github/workflows/release.yaml @@ -0,0 +1,74 @@ +name: Release +on: + workflow_dispatch: + inputs: + version: + type: choice + required: true + description: "Version bump type" + options: + - patch + - minor + - major + +jobs: + release: + name: Release + timeout-minutes: 20 + runs-on: ubuntu-latest + environment: + name: pypi + url: https://pypi.org/p/solace_ai_connector + permissions: + id-token: write + contents: write + + steps: + - name: Checkout + uses: actions/checkout@v4 + with: + fetch-depth: 0 + ssh-key: ${{ secrets.COMMIT_KEY }} + + - name: Set up Python + uses: actions/setup-python@v5 + with: + python-version: '3.x' + + - name: Install hatch + run: | + pip install --upgrade pip + pip install hatch + + - name: Bump Version + run: | + CURRENT_VERSION=$(hatch version) + echo "CURRENT_VERSION=${CURRENT_VERSION}" >> $GITHUB_ENV + hatch version "${{ github.event.inputs.version }}" + NEW_VERSION=$(hatch version) + echo "NEW_VERSION=${NEW_VERSION}" >> $GITHUB_ENV + + - name: Fail if the current version doesn't exist + if: env.CURRENT_VERSION == '' + run: exit 1 + + - name: Build project for distribution + run: hatch build + + - name: Publish package distributions to PyPI + uses: pypa/gh-action-pypi-publish@release/v1 + + - name: Create Release + uses: ncipollo/release-action@v1 + with: + artifacts: "dist/*.whl" + makeLatest: true + generateReleaseNotes: true + tag: ${{ env.CURRENT_VERSION }} + + - name: Commit new version + run: | + git config --local user.email "action@github.com" + git config --local user.name "GitHub Action" + git commit -a -m "[ci skip] Bump version to $NEW_VERSION" + git push diff --git a/Dockerfile b/Dockerfile index f18f7828..2f919ea6 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,7 +7,9 @@ RUN apt-get update && \ apt-get clean #Install main program -COPY . /app +COPY /src /app/src +COPY requirements.txt /app + RUN python3.10 -m pip install -r requirements.txt ENV PYTHONUNBUFFERED=1 diff --git a/Makefile b/Makefile index 57f406eb..94518aaa 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ include .env VERSION ?= local gen-docs: - @python3 src/tools/gen_component_docs.py + @python3 src/solace_ai_connector/tools/gen_component_docs.py build-pypi: @python3 -m build diff --git a/docs/components/index.md b/docs/components/index.md index a6b7cd54..0fa9b6d5 100644 --- a/docs/components/index.md +++ b/docs/components/index.md @@ -16,8 +16,6 @@ | [langchain_vector_store_embedding_search](langchain_vector_store_embedding_search.md) | Use LangChain Vector Stores to search a vector store with a semantic search. This will take text, run it through an embedding model with a query embedding and then find the closest matches in the store. | | [message_filter](message_filter.md) | A filtering component. This will apply a user configurable expression. If the expression evaluates to True, the message will be passed on. If the expression evaluates to False, the message will be discarded. If the message is discarded, any previous components that require an acknowledgement will be acknowledged. | | [pass_through](pass_through.md) | What goes in comes out | -| [slack_input](slack_input.md) | Slack input component. The component connects to Slack using the Bolt API and receives messages from Slack channels. | -| [slack_output](slack_output.md) | Slack output component. The component sends messages to Slack channels using the Bolt API. | | [stdin_input](stdin_input.md) | STDIN input component. The component will prompt for input, which will then be placed in the message payload using the output schema below. | | [stdout_output](stdout_output.md) | STDOUT output component | | [timer_input](timer_input.md) | An input that will generate a message at a specified interval. | diff --git a/docs/components/langchain_chat_model_with_history.md b/docs/components/langchain_chat_model_with_history.md index 5f828b9d..04ef74b1 100644 --- a/docs/components/langchain_chat_model_with_history.md +++ b/docs/components/langchain_chat_model_with_history.md @@ -13,6 +13,7 @@ component_config: langchain_component_config: llm_response_format: history_max_turns: + history_max_message_size: history_max_tokens: history_module: history_class: @@ -29,6 +30,7 @@ component_config: | langchain_component_config | True | | Model specific configuration for the chat model. See documentation for valid parameter names. | | llm_response_format | False | | The response format for this LLM request. This can be 'json', 'yaml', or 'text'. If set to 'json' or 'yaml', the response will be parsed by the appropriate parser and the fields will be available in the response object. If set to 'text', the response will be returned as a string. | | history_max_turns | False | 20 | The maximum number of turns to keep in the history. If not set, the history will be limited to 20 turns. | +| history_max_message_size | False | 1000 | The maximum amount of characters to keep in a single message in the history. | | history_max_tokens | False | 8000 | The maximum number of tokens to keep in the history. If not set, the history will be limited to 8000 tokens. | | history_module | False | langchain_community.chat_message_histories | The module that contains the history class. Default: 'langchain_community.chat_message_histories' | | history_class | False | ChatMessageHistory | The class to use for the history. Default: 'ChatMessageHistory' | diff --git a/docs/components/langchain_vector_store_embedding_search.md b/docs/components/langchain_vector_store_embedding_search.md index 76ae4e5a..6e24d839 100644 --- a/docs/components/langchain_vector_store_embedding_search.md +++ b/docs/components/langchain_vector_store_embedding_search.md @@ -66,6 +66,6 @@ component_config: | --- | --- | --- | | results | True | | | results.matches | False | | -| resultsmatches[].text | True | | -| resultsmatches[].metadata | False | | -| resultsmatches[].score | False | | +| results.matches[].text | True | | +| results.matches[].metadata | False | | +| results.matches[].score | False | | diff --git a/docs/components/slack_input.md b/docs/components/slack_input.md deleted file mode 100644 index de6110b1..00000000 --- a/docs/components/slack_input.md +++ /dev/null @@ -1,85 +0,0 @@ -# SlackInput - -Slack input component. The component connects to Slack using the Bolt API and receives messages from Slack channels. - -## Configuration Parameters - -```yaml -component_name: -component_module: slack_input -component_config: - slack_bot_token: - slack_app_token: - share_slack_connection: - max_file_size: - max_total_file_size: - listen_to_channels: - send_history_on_join: - acknowledgement_message: -``` - -| Parameter | Required | Default | Description | -| --- | --- | --- | --- | -| slack_bot_token | False | | The Slack bot token to connect to Slack. | -| slack_app_token | False | | The Slack app token to connect to Slack. | -| share_slack_connection | False | | Share the Slack connection with other components in this instance. | -| max_file_size | False | 20 | The maximum file size to download from Slack in MB. Default: 20MB | -| max_total_file_size | False | 20 | The maximum total file size to download from Slack in MB. Default: 20MB | -| listen_to_channels | False | False | Whether to listen to channels or not. Default: False | -| send_history_on_join | False | False | Send history on join. Default: False | -| acknowledgement_message | False | | The message to send to acknowledge the user's message has been received. | - - - -## Component Output Schema - -``` -{ - event: { - text: , - files: [ - { - name: , - content: , - mime_type: , - filetype: , - size: - }, - ... - ], - user_email: , - mentions: [ - , - ... - ], - type: , - user_id: , - client_msg_id: , - ts: , - channel: , - subtype: , - event_ts: , - channel_type: - } -} -``` -| Field | Required | Description | -| --- | --- | --- | -| event | True | | -| event.text | False | | -| event.files | False | | -| eventfiles[].name | False | | -| eventfiles[].content | False | | -| eventfiles[].mime_type | False | | -| eventfiles[].filetype | False | | -| eventfiles[].size | False | | -| event.user_email | False | | -| event.mentions | False | | -| event.type | False | | -| event.user_id | False | | -| event.client_msg_id | False | | -| event.ts | False | | -| event.channel | False | | -| event.subtype | False | | -| event.event_ts | False | | -| event.channel_type | False | | diff --git a/docs/components/slack_output.md b/docs/components/slack_output.md deleted file mode 100644 index e1ba4383..00000000 --- a/docs/components/slack_output.md +++ /dev/null @@ -1,74 +0,0 @@ -# SlackOutput - -Slack output component. The component sends messages to Slack channels using the Bolt API. - -## Configuration Parameters - -```yaml -component_name: -component_module: slack_output -component_config: - slack_bot_token: - slack_app_token: - share_slack_connection: -``` - -| Parameter | Required | Default | Description | -| --- | --- | --- | --- | -| slack_bot_token | False | | The Slack bot token to connect to Slack. | -| slack_app_token | False | | The Slack app token to connect to Slack. | -| share_slack_connection | False | | Share the Slack connection with other components in this instance. | - - -## Component Input Schema - -``` -{ - message_info: { - channel: , - type: , - user_email: , - client_msg_id: , - ts: , - subtype: , - event_ts: , - channel_type: , - user_id: , - session_id: - }, - content: { - text: , - files: [ - { - name: , - content: , - mime_type: , - filetype: , - size: - }, - ... - ] - } -} -``` -| Field | Required | Description | -| --- | --- | --- | -| message_info | True | | -| message_info.channel | True | | -| message_info.type | False | | -| message_info.user_email | False | | -| message_info.client_msg_id | False | | -| message_info.ts | False | | -| message_info.subtype | False | | -| message_info.event_ts | False | | -| message_info.channel_type | False | | -| message_info.user_id | False | | -| message_info.session_id | True | | -| content | True | | -| content.text | False | | -| content.files | False | | -| contentfiles[].name | False | | -| contentfiles[].content | False | | -| contentfiles[].mime_type | False | | -| contentfiles[].filetype | False | | -| contentfiles[].size | False | | diff --git a/pyproject.toml b/pyproject.toml index 8dc87810..e3154b17 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,7 @@ build-backend = "hatchling.build" [project] name = "solace_ai_connector" -version = "0.0.1" +dynamic = ["version"] authors = [ { name="Edward Funnekotter", email="edward.funnekotter@solace.com" }, ] @@ -18,12 +18,12 @@ classifiers = [ "Operating System :: OS Independent", ] dependencies = [ - "boto3>=1.34.93", - "langchain_core>=0.2.4", - "PyYAML>=6.0.1", - "Requests>=2.32.3", - "slack_bolt>=1.18.1", - "solace_pubsubplus>=1.6.0", + "boto3~=1.34.122", + "langchain_core~=0.2.5", + "langchain~=0.2.3", + "PyYAML~=6.0.1", + "Requests~=2.32.3", + "solace_pubsubplus>=1.8.0", ] [project.urls] @@ -34,6 +34,30 @@ documentation = "https://github.com/SolaceLabs/solace-ai-connector/blob/main/doc [project.scripts] solace-ai-connector = "solace_ai_connector.main:main" +solace-ai-connector-gen-docs = "solace_ai_connector.tools.gen_component_docs:main" [tool.hatch.build.targets.wheel] packages = ["src/solace_ai_connector"] + +[tool.hatch.version] +path = "src/solace_ai_connector/__init__.py" + +[tool.hatch.envs.test] +dependencies = [ + "pytest>=8.2.2", + "coverage>=7.5.4", + "twine>=5.1.1", +] + +[tool.hatch.envs.lint] +detached = true +dependencies = [ + "ruff>=0.5.0", +] + +[tool.ruff] +lint.select = ["E4", "E7", "E9", "F"] +lint.ignore = ["F401", "E731"] + +[[tool.hatch.envs.test.matrix]] +python = ["38", "312"] diff --git a/requirements.txt b/requirements.txt index 26d24092..702aa66d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,6 @@ -boto3==1.34.93 -langchain_core==0.2.4 -PyYAML==6.0.1 -Requests==2.32.3 -slack_bolt==1.18.1 -solace_pubsubplus==1.6.0 +boto3~=1.34.122 +langchain_core~=0.2.5 +langchain~=0.2.3 +PyYAML~=6.0.1 +Requests~=2.32.3 +solace_pubsubplus~=1.8.0 diff --git a/slack.yaml b/slack.yaml deleted file mode 100644 index f6cfeb25..00000000 --- a/slack.yaml +++ /dev/null @@ -1,29 +0,0 @@ ---- -log: - stdout_log_level: INFO - log_file_level: DEBUG - log_file: solace_ai_connector.log - - -# List of flows -flows: - - name: slack_input - trace_level: DEBUG - components: - - component_name: slack_input - component_module: slack_input - component_config: - slack_bot_token: ${SLACK_BOT_TOKEN} - slack_app_token: ${SLACK_APP_TOKEN} - - - component_name: stdout_output - component_module: stdout_output - - # - name: broker_output - # component: outputs.solace_event_broker - # config: - # broker_connection_config: - # share_connection: solace - # topic_config: - # reply: true - \ No newline at end of file diff --git a/src/solace_ai_connector/common/messaging/solace_messaging.py b/src/solace_ai_connector/common/messaging/solace_messaging.py index 601298cc..ef1e29c5 100644 --- a/src/solace_ai_connector/common/messaging/solace_messaging.py +++ b/src/solace_ai_connector/common/messaging/solace_messaging.py @@ -2,6 +2,7 @@ import logging import os +import certifi from solace.messaging.messaging_service import ( MessagingService, @@ -130,6 +131,7 @@ def connect(self): "trust_store_path" ) or os.environ.get("TRUST_STORE") + or os.path.dirname(certifi.where()) or "/usr/share/ca-certificates/mozilla/", } # print (f"Broker Properties: {self.broker_properties}") diff --git a/src/solace_ai_connector/common/utils.py b/src/solace_ai_connector/common/utils.py index 2642cd2f..22a8e79f 100644 --- a/src/solace_ai_connector/common/utils.py +++ b/src/solace_ai_connector/common/utils.py @@ -5,6 +5,7 @@ import sys import re import builtins +import subprocess from .log import log @@ -94,22 +95,42 @@ def resolve_config_values(config, allow_source_expression=False): return config -def import_module(name, base_path=None): +def import_module(name, base_path=None, component_package=None): """Import a module by name""" + if component_package: + install_package(component_package) + if base_path: - if not os.path.exists(base_path): + if base_path not in sys.path: sys.path.append(base_path) try: module = importlib.import_module(name) - except ModuleNotFoundError: - try: - module = import_from_directories(name, base_path=base_path) - except Exception as e: - raise ImportError( - f"Module load error for {name}, base_path={base_path} ", e - ) from e - return module + return module + except ModuleNotFoundError as exc: + # If the module does not have a path associated with it, try + # importing it from the known prefixes - annoying that this + # is necessary. It seems you can't dynamically import a module + # that is listed in an __init__.py file :( + if "." not in name: + for prefix in [ + "solace_ai_connector.components", + "solace_ai_connector.components.general", + "solace_ai_connector.components.general.for_testing", + "solace_ai_connector.components.general.langchain", + "solace_ai_connector.components.inputs_outputs", + "solace_ai_connector.transforms", + "solace_ai_connector.common", + ]: + full_name = f"{prefix}.{name}" + try: + module = importlib.import_module(full_name) + return module + except ModuleNotFoundError: + pass + except Exception as e: + raise ImportError(f"Module load error for {full_name}: {e}") from e + raise ModuleNotFoundError(f"Module '{name}' not found") from exc def invoke_config(config, allow_source_expression=False): @@ -213,6 +234,14 @@ def call_function(function, params, allow_source_expression): return function(**params) +def install_package(package_name): + """Install a package using pip if it isn't already installed""" + try: + importlib.import_module(package_name) + except ImportError: + subprocess.run(["pip", "install", package_name], check=True) + + def extract_source_expression(se_call): # First remove the source_expression( and the trailing ) # Account for possible whitespace diff --git a/src/solace_ai_connector/components/__init__.py b/src/solace_ai_connector/components/__init__.py index c60701ec..a7b8043a 100644 --- a/src/solace_ai_connector/components/__init__.py +++ b/src/solace_ai_connector/components/__init__.py @@ -3,13 +3,12 @@ from .inputs_outputs import ( error_input, timer_input, - slack_output, broker_input, broker_output, stdout_output, stdin_input, - slack_input, ) + from .general import ( user_processor, aggregate, @@ -38,12 +37,10 @@ # Also import the components from the submodules from .inputs_outputs.error_input import ErrorInput from .inputs_outputs.timer_input import TimerInput -from .inputs_outputs.slack_output import SlackOutput from .inputs_outputs.broker_input import BrokerInput from .inputs_outputs.broker_output import BrokerOutput from .inputs_outputs.stdout_output import Stdout from .inputs_outputs.stdin_input import Stdin -from .inputs_outputs.slack_input import SlackInput from .general.user_processor import UserProcessor from .general.aggregate import Aggregate from .general.for_testing.need_ack_input import NeedAckInput diff --git a/src/solace_ai_connector/components/component_base.py b/src/solace_ai_connector/components/component_base.py index 4bbaf22b..aef1aa36 100644 --- a/src/solace_ai_connector/components/component_base.py +++ b/src/solace_ai_connector/components/component_base.py @@ -23,6 +23,8 @@ def __init__(self, module_info, **kwargs): self.config = kwargs.pop("config", {}) self.index = kwargs.pop("index", None) self.flow_name = kwargs.pop("flow_name", None) + self.flow_lock_manager = kwargs.pop("flow_lock_manager", None) + self.flow_kv_store = kwargs.pop("flow_kv_store", None) self.stop_signal = kwargs.pop("stop_signal", None) self.sibling_component = kwargs.pop("sibling_component", None) self.component_index = kwargs.pop("component_index", None) @@ -283,6 +285,15 @@ def set_queue_timeout(self, timeout_ms): def get_default_queue_timeout(self): return DEFAULT_QUEUE_TIMEOUT_MS + def get_lock(self, lock_name): + return self.flow_lock_manager.get_lock(lock_name) + + def kv_store_get(self, key): + return self.flow_kv_store.get(key) + + def kv_store_set(self, key, value): + self.flow_kv_store.set(key, value) + def setup_communications(self): self.queue_timeout_ms = None # pylint: disable=assignment-from-none self.queue_max_depth = self.config.get( diff --git a/src/solace_ai_connector/components/general/langchain/langchain_chat_model_with_history.py b/src/solace_ai_connector/components/general/langchain/langchain_chat_model_with_history.py index 62f3e1dd..bce75528 100644 --- a/src/solace_ai_connector/components/general/langchain/langchain_chat_model_with_history.py +++ b/src/solace_ai_connector/components/general/langchain/langchain_chat_model_with_history.py @@ -37,6 +37,12 @@ "If not set, the history will be limited to 20 turns.", "default": 20, }, + { + "name": "history_max_message_size", + "required": False, + "description": "The maximum amount of characters to keep in a single message in the history. ", + "default": 1000, + }, { "name": "history_max_tokens", "required": False, @@ -101,6 +107,9 @@ class LangChainChatModelWithHistory(LangChainChatModelBase): def __init__(self, **kwargs): super().__init__(info, **kwargs) self.history_max_turns = self.get_config("history_max_turns", 20) + self.history_max_message_size = self.get_config( + "history_max_message_size", 1000 + ) self.history_max_tokens = self.get_config("history_max_tokens", 8000) self.stream_to_flow = self.get_config("stream_to_flow", "") self.llm_mode = self.get_config("llm_mode", "none") @@ -176,6 +185,8 @@ def invoke_model( result = namedtuple("Result", ["content"])(aggregate_result) + self.prune_large_message_from_history(session_id) + return result def send_streaming_message(self, input_message, chunk, aggregate_result): @@ -215,6 +226,21 @@ def get_history(self, session_id: str) -> BaseChatMessageHistory: ] return self._histories[session_id] + def prune_large_message_from_history(self, session_id: str): + with self._lock: + # Loop over the last 2 messages in the history and truncate if needed + if ( + session_id in self._histories + and len(self._histories[session_id].messages) > 1 + ): + last_two_messages = self._histories[session_id].messages[-2:] + for message in last_two_messages: + if len(message.content) > self.history_max_message_size: + message.content = ( + message.content[: self.history_max_message_size] + + " ...truncated..." + ) + def clear_history(self, session_id: str): with self._lock: if session_id in self._histories: diff --git a/src/solace_ai_connector/components/inputs_outputs/broker_input.py b/src/solace_ai_connector/components/inputs_outputs/broker_input.py index 40053413..a5fad8ef 100644 --- a/src/solace_ai_connector/components/inputs_outputs/broker_input.py +++ b/src/solace_ai_connector/components/inputs_outputs/broker_input.py @@ -115,7 +115,7 @@ def get_next_message(self, timeout_ms=None): "Received message from broker: topic=%s, user_properties=%s, payload length=%d", topic, user_properties, - len(payload), + len(payload) if payload is not None else 0, ) return Message(payload=payload, topic=topic, user_properties=user_properties) diff --git a/src/solace_ai_connector/components/inputs_outputs/slack_base.py b/src/solace_ai_connector/components/inputs_outputs/slack_base.py deleted file mode 100644 index 5d93e34a..00000000 --- a/src/solace_ai_connector/components/inputs_outputs/slack_base.py +++ /dev/null @@ -1,36 +0,0 @@ -"""Base class for all Slack components""" - -from abc import ABC, abstractmethod -from slack_bolt import App # pylint: disable=import-error -from ..component_base import ComponentBase - - -class SlackBase(ComponentBase, ABC): - _slack_apps = {} - - def __init__(self, module_info, **kwargs): - super().__init__(module_info, **kwargs) - self.slack_bot_token = self.get_config("slack_bot_token") - self.slack_app_token = self.get_config("slack_app_token") - self.max_file_size = self.get_config("max_file_size", 20) - self.max_total_file_size = self.get_config("max_total_file_size", 20) - self.share_slack_connection = self.get_config("share_slack_connection") - - if self.share_slack_connection: - if self.slack_bot_token not in SlackBase._slack_apps: - self.app = App(token=self.slack_bot_token) - SlackBase._slack_apps[self.slack_bot_token] = self.app - else: - self.app = SlackBase._slack_apps[self.slack_bot_token] - else: - self.app = App(token=self.slack_bot_token) - - @abstractmethod - def invoke(self, message, data): - pass - - def __str__(self): - return self.__class__.__name__ + " " + str(self.config) - - def __repr__(self): - return self.__str__() diff --git a/src/solace_ai_connector/components/inputs_outputs/slack_input.py b/src/solace_ai_connector/components/inputs_outputs/slack_input.py deleted file mode 100644 index 63712111..00000000 --- a/src/solace_ai_connector/components/inputs_outputs/slack_input.py +++ /dev/null @@ -1,440 +0,0 @@ -import threading -import queue -import base64 -import requests - - -from slack_bolt.adapter.socket_mode import SocketModeHandler -from .slack_base import SlackBase -from ...common.message import Message -from ...common.log import log - - -info = { - "class_name": "SlackInput", - "description": ( - "Slack input component. The component connects to Slack using the Bolt API " - "and receives messages from Slack channels." - ), - "config_parameters": [ - { - "name": "slack_bot_token", - "type": "string", - "description": "The Slack bot token to connect to Slack.", - }, - { - "name": "slack_app_token", - "type": "string", - "description": "The Slack app token to connect to Slack.", - }, - { - "name": "share_slack_connection", - "type": "string", - "description": "Share the Slack connection with other components in this instance.", - }, - { - "name": "max_file_size", - "type": "number", - "description": "The maximum file size to download from Slack in MB. Default: 20MB", - "default": 20, - "required": False, - }, - { - "name": "max_total_file_size", - "type": "number", - "description": "The maximum total file size to download " - "from Slack in MB. Default: 20MB", - "default": 20, - "required": False, - }, - { - "name": "listen_to_channels", - "type": "boolean", - "description": "Whether to listen to channels or not. Default: False", - "default": False, - "required": False, - }, - { - "name": "send_history_on_join", - "type": "boolean", - "description": "Send history on join. Default: False", - "default": False, - "required": False, - }, - { - "name": "acknowledgement_message", - "type": "string", - "description": "The message to send to acknowledge the user's message has been received.", - "required": False, - }, - ], - "output_schema": { - "type": "object", - "properties": { - "event": { - "type": "object", - "properties": { - "text": { - "type": "string", - }, - "files": { - "type": "array", - "items": { - "type": "object", - "properties": { - "name": { - "type": "string", - }, - "content": { - "type": "string", - }, - "mime_type": { - "type": "string", - }, - "filetype": { - "type": "string", - }, - "size": { - "type": "number", - }, - }, - }, - }, - "user_email": { - "type": "string", - }, - "mentions": { - "type": "array", - "items": { - "type": "string", - }, - }, - "type": { - "type": "string", - }, - "user_id": { - "type": "string", - }, - "client_msg_id": { - "type": "string", - }, - "ts": { - "type": "string", - }, - "channel": { - "type": "string", - }, - "subtype": { - "type": "string", - }, - "event_ts": { - "type": "string", - }, - "channel_type": { - "type": "string", - }, - }, - }, - }, - "required": ["event"], - }, -} - - -class SlackInput(SlackBase): - def __init__(self, **kwargs): - super().__init__(info, **kwargs) - self.slack_receiver_queue = None - self.slack_receiver = None - self.init_slack_receiver() - - def init_slack_receiver(self): - # Create a queue to get messages from the Slack receiver - self.slack_receiver_queue = queue.Queue() - self.stop_receiver_event = threading.Event() - self.slack_receiver = SlackReceiver( - app=self.app, - slack_app_token=self.slack_app_token, - slack_bot_token=self.slack_bot_token, - input_queue=self.slack_receiver_queue, - stop_event=self.stop_receiver_event, - max_file_size=self.get_config("max_file_size"), - max_total_file_size=self.get_config("max_total_file_size"), - listen_to_channels=self.get_config("listen_to_channels"), - send_history_on_join=self.get_config("send_history_on_join"), - acknowledgement_message=self.get_config("acknowledgement_message"), - ) - self.slack_receiver.start() - - def stop_component(self): - self.stop_slack_receiver() - - def stop_slack_receiver(self): - self.stop_receiver_event.set() - self.slack_receiver.join() - - def get_next_message(self): - # Get the next message from the Slack receiver queue - message = self.slack_receiver_queue.get() - return message - - def invoke(self, _message, data): - return data - - -class SlackReceiver(threading.Thread): - def __init__( - self, - app, - slack_app_token, - slack_bot_token, - input_queue, - stop_event, - max_file_size=20, - max_total_file_size=20, - listen_to_channels=False, - send_history_on_join=False, - acknowledgement_message=None, - ): - threading.Thread.__init__(self) - self.app = app - self.slack_app_token = slack_app_token - self.slack_bot_token = slack_bot_token - self.input_queue = input_queue - self.stop_event = stop_event - self.max_file_size = max_file_size - self.max_total_file_size = max_total_file_size - self.listen_to_channels = listen_to_channels - self.send_history_on_join = send_history_on_join - self.acknowledgement_message = acknowledgement_message - self.register_handlers() - - def run(self): - SocketModeHandler(self.app, self.slack_app_token).connect() - self.stop_event.wait() - - def handle_channel_event(self, event): - # For now, just do the normal handling - channel_name = self.get_channel_name(event.get("channel")) - event["channel_name"] = channel_name - - self.handle_event(event) - - def handle_group_event(self, _event): - log.info("Received a private group event. Ignoring.") - - def handle_event(self, event): - files = [] - total_file_size = 0 - if "files" in event: - for file in event["files"]: - file_url = file["url_private"] - file_name = file["name"] - size = file["size"] - total_file_size += size - if size > self.max_file_size * 1024 * 1024: - log.warning( - "File %s is too large to download. Skipping download.", - file_name, - ) - continue - if total_file_size > self.max_total_file_size * 1024 * 1024: - log.warning( - "Total file size exceeds the maximum limit. Skipping download." - ) - break - b64_file = self.download_file_as_base64_string(file_url) - files.append( - { - "name": file_name, - "content": b64_file, - "mime_type": file["mimetype"], - "filetype": file["filetype"], - "size": size, - } - ) - - team_domain = None - try: - permalink = self.app.client.chat_getPermalink( - channel=event["channel"], message_ts=event["event_ts"] - ) - team_domain = permalink.get("permalink", "").split("//")[1] - team_domain = team_domain.split(".")[0] - except Exception as e: - log.error("Error getting team domain: %s", e) - - user_email = self.get_user_email(event["user"]) - (text, mention_emails) = self.process_text_for_mentions(event["text"]) - payload = { - "text": text, - "files": files, - "user_email": user_email, - "team_id": event.get("team"), - "team_domain": team_domain, - "mentions": mention_emails, - "type": event.get("type"), - "client_msg_id": event.get("client_msg_id"), - "ts": event.get("thread_ts"), - "channel": event.get("channel"), - "channel_name": event.get("channel_name", ""), - "subtype": event.get("subtype"), - "event_ts": event.get("event_ts"), - "channel_type": event.get("channel_type"), - "user_id": event.get("user"), - } - user_properties = { - "user_email": user_email, - "team_id": event.get("team"), - "type": event.get("type"), - "client_msg_id": event.get("client_msg_id"), - "ts": event.get("thread_ts"), - "channel": event.get("channel"), - "subtype": event.get("subtype"), - "event_ts": event.get("event_ts"), - "channel_type": event.get("channel_type"), - "user_id": event.get("user"), - } - - if self.acknowledgement_message: - ack_msg_ts = self.app.client.chat_postMessage( - channel=event["channel"], - text=self.acknowledgement_message, - thread_ts=event.get("thread_ts"), - ).get("ts") - user_properties["ack_msg_ts"] = ack_msg_ts - - message = Message(payload=payload, user_properties=user_properties) - message.set_previous(payload) - self.input_queue.put(message) - - def download_file_as_base64_string(self, file_url): - headers = {"Authorization": "Bearer " + self.slack_bot_token} - response = requests.get(file_url, headers=headers, timeout=10) - base64_string = base64.b64encode(response.content).decode("utf-8") - return base64_string - - def get_user_email(self, user_id): - response = self.app.client.users_info(user=user_id) - return response["user"]["profile"].get("email", user_id) - - def process_text_for_mentions(self, text): - mention_emails = [] - for mention in text.split("<@"): - if mention.startswith("!"): - mention = mention[1:] - if mention.startswith("U"): - user_id = mention.split(">")[0] - response = self.app.client.users_info(user=user_id) - profile = response.get("user", {}).get("profile") - if profile: - replacement = profile.get( - "email", "<@" + profile.get("real_name_normalized") + ">" - ) - mention_emails.append(replacement) - text = text.replace( - f"<@{user_id}>", - replacement, - ) - return text, mention_emails - - def get_channel_name(self, channel_id): - response = self.app.client.conversations_info(channel=channel_id) - return response["channel"].get("name") - - def get_channel_history(self, channel_id, team_id): - response = self.app.client.conversations_history(channel=channel_id) - - # First search through messages to get all their replies - messages_to_add = [] - for message in response["messages"]: - if "subtype" not in message and "text" in message: - if "reply_count" in message: - # Get the replies - replies = self.app.client.conversations_replies( - channel=channel_id, ts=message.get("ts") - ) - messages_to_add.extend(replies["messages"]) - - response["messages"].extend(messages_to_add) - - # Go through the messages and remove any that have a sub_type - messages = [] - emails = {} - for message in response["messages"]: - if "subtype" not in message and "text" in message: - if message.get("user") not in emails: - emails[message.get("user")] = self.get_user_email( - message.get("user") - ) - payload = { - "text": message.get("text"), - "team_id": team_id, - "user_email": emails[message.get("user")], - "mentions": [], - "type": message.get("type"), - "client_msg_id": message.get("client_msg_id") or message.get("ts"), - "ts": message.get("ts"), - "event_ts": message.get("event_ts") or message.get("ts"), - "channel": channel_id, - "subtype": message.get("subtype"), - "user_id": message.get("user"), - "message_id": message.get("client_msg_id"), - } - messages.append(payload) - - return messages - - def handle_new_channel_join(self, event): - """We have been added to a new channel. This will get all the history and send it to the input queue.""" - history = self.get_channel_history(event.get("channel"), event.get("team")) - payload = { - "text": "New channel joined", - "user_email": "", - "mentions": [], - "type": "channel_join", - "client_msg_id": "", - "ts": "", - "channel": event.get("channel"), - "subtype": "channel_join", - "event_ts": "", - "channel_type": "channel", - "channel_name": self.get_channel_name(event.get("channel")), - "user_id": "", - "history": history, - } - user_properties = { - "type": "channel_join", - "channel": event.get("channel"), - "subtype": "channel_join", - "channel_type": "channel", - } - message = Message(payload=payload, user_properties=user_properties) - message.set_previous(payload) - self.input_queue.put(message) - - def register_handlers(self): - @self.app.event("message") - def handle_chat_message(event): - print("Got message event: ", event, event.get("channel_type")) - if event.get("channel_type") == "im": - self.handle_event(event) - elif event.get("channel_type") == "channel": - self.handle_channel_event(event) - elif event.get("channel_type") == "group": - self.handle_group_event(event) - - @self.app.event("app_mention") - def handle_app_mention(event): - print("Got app_mention event: ", event) - event["channel_type"] = "im" - event["channel_name"] = self.get_channel_name(event.get("channel")) - self.handle_event(event) - - @self.app.event("member_joined_channel") - def handle_member_joined_channel(event, _say, context): - if ( - self.send_history_on_join - and event.get("user") == context["bot_user_id"] - ): - self.handle_new_channel_join(event) diff --git a/src/solace_ai_connector/components/inputs_outputs/slack_output.py b/src/solace_ai_connector/components/inputs_outputs/slack_output.py deleted file mode 100644 index 2263f644..00000000 --- a/src/solace_ai_connector/components/inputs_outputs/slack_output.py +++ /dev/null @@ -1,176 +0,0 @@ -import base64 - - -from .slack_base import SlackBase -from ...common.log import log - - -info = { - "class_name": "SlackOutput", - "description": ( - "Slack output component. The component sends messages to Slack channels using the Bolt API." - ), - "config_parameters": [ - { - "name": "slack_bot_token", - "type": "string", - "description": "The Slack bot token to connect to Slack.", - }, - { - "name": "slack_app_token", - "type": "string", - "description": "The Slack app token to connect to Slack.", - }, - { - "name": "share_slack_connection", - "type": "string", - "description": "Share the Slack connection with other components in this instance.", - }, - ], - "input_schema": { - "type": "object", - "properties": { - "message_info": { - "type": "object", - "properties": { - "channel": { - "type": "string", - }, - "type": { - "type": "string", - }, - "user_email": { - "type": "string", - }, - "client_msg_id": { - "type": "string", - }, - "ts": { - "type": "string", - }, - "subtype": { - "type": "string", - }, - "event_ts": { - "type": "string", - }, - "channel_type": { - "type": "string", - }, - "user_id": { - "type": "string", - }, - "session_id": { - "type": "string", - }, - }, - "required": ["channel", "session_id"], - }, - "content": { - "type": "object", - "properties": { - "text": { - "type": "string", - }, - "files": { - "type": "array", - "items": { - "type": "object", - "properties": { - "name": { - "type": "string", - }, - "content": { - "type": "string", - }, - "mime_type": { - "type": "string", - }, - "filetype": { - "type": "string", - }, - "size": { - "type": "number", - }, - }, - }, - }, - }, - }, - }, - "required": ["message_info", "content"], - }, -} - - -class SlackOutput(SlackBase): - def __init__(self, **kwargs): - super().__init__(info, **kwargs) - - def invoke(self, message, data): - message_info = data.get("message_info") - content = data.get("content") - text = content.get("text") - stream = content.get("stream") - channel = message_info.get("channel") - thread_ts = message_info.get("ts") - ack_msg_ts = message_info.get("ack_msg_ts") - - return { - "channel": channel, - "text": text, - "files": content.get("files"), - "thread_ts": thread_ts, - "ack_msg_ts": ack_msg_ts, - "stream": stream, - } - - def send_message(self, message): - try: - channel = message.get_data("previous:channel") - messages = message.get_data("previous:text") - stream = message.get_data("previous:stream") - files = message.get_data("previous:files") or [] - thread_ts = message.get_data("previous:ts") - ack_msg_ts = message.get_data("previous:ack_msg_ts") - - if not isinstance(messages, list): - if messages is not None: - messages = [messages] - else: - messages = [] - - for text in messages: - if stream: - if ack_msg_ts: - try: - self.app.client.chat_update( - channel=channel, ts=ack_msg_ts, text=text - ) - except Exception: - # It is normal to possibly get an update after the final message has already - # arrived and deleted the ack message - pass - else: - self.app.client.chat_postMessage( - channel=channel, text=text, thread_ts=thread_ts - ) - - for file in files: - file_content = base64.b64decode(file["content"]) - self.app.client.files_upload_v2( - channel=channel, - file=file_content, - thread_ts=thread_ts, - filename=file["name"], - ) - except Exception as e: - log.error("Error sending slack message: %s", e) - - super().send_message(message) - - try: - if ack_msg_ts and not stream: - self.app.client.chat_delete(channel=channel, ts=ack_msg_ts) - except Exception: - pass diff --git a/src/solace_ai_connector/flow/flow.py b/src/solace_ai_connector/flow/flow.py index 20a0a3a7..413be2b8 100644 --- a/src/solace_ai_connector/flow/flow.py +++ b/src/solace_ai_connector/flow/flow.py @@ -1,10 +1,40 @@ """Main class for the flow""" +import threading + # from solace_ai_connector.common.log import log from ..common.utils import import_module +class FlowLockManager: + def __init__(self): + self._lock = threading.Lock() + self.locks = {} + + def get_lock(self, lock_name): + with self._lock: + if lock_name not in self.locks: + self.locks[lock_name] = threading.Lock() + + return self.locks[lock_name] + + +class FlowKVStore: + def __init__(self): + self.store = {} + + def set(self, key, value): + self.store[key] = value + + def get(self, key): + return self.store.get(key, None) + + class Flow: + + _lock_manager = FlowLockManager() + _kv_store = FlowKVStore() + def __init__( self, flow_config, @@ -31,6 +61,8 @@ def __init__( self.connector = connector self.flow_input_queue = None self.threads = [] + self.flow_lock_manager = Flow._lock_manager + self.flow_kv_store = Flow._kv_store self.create_components() def create_components(self): @@ -55,12 +87,13 @@ def create_components(self): def create_component_group(self, component, index): component_module = component.get("component_module", "") base_path = component.get("component_base_path", None) + component_package = component.get("component_package", None) num_instances = component.get("num_instances", 1) # component_config = component.get("component_config", {}) # component_name = component.get("component_name", "") # imported_module = import_from_directories(component_module) - imported_module = import_module(component_module, base_path) + imported_module = import_module(component_module, base_path, component_package) try: self.module_info = getattr(imported_module, "info") @@ -80,6 +113,8 @@ def create_component_group(self, component, index): index=index, # module_info=self.module_info, flow_name=self.name, + flow_lock_manager=self.flow_lock_manager, + flow_kv_store=self.flow_kv_store, stop_signal=self.stop_signal, sibling_component=sibling_component, component_index=component_index, diff --git a/src/solace_ai_connector/tools/gen_component_docs.py b/src/solace_ai_connector/tools/gen_component_docs.py new file mode 100644 index 00000000..98610885 --- /dev/null +++ b/src/solace_ai_connector/tools/gen_component_docs.py @@ -0,0 +1,407 @@ +import os +import re +import sys +import json +import glob +import importlib.util +import yaml # pylint: disable=import-error + +sys.path.append("src") + + +# Function to descend into a directory and find all Python files +def find_python_files(directory): + for root, _, files in os.walk(directory): + for file in files: + # Skip if 'for_testing' is in the path + if "for_testing" in root: + continue + if file.endswith(".py"): + yield os.path.join(root, file) + + +# For each Python file, import it and see if it has a info dictionary at the top level +def find_info_dicts(directory): + for file in find_python_files(directory): + # Dynamically import the module + if file.endswith("__init__.py"): + continue + if "/solace_ai_connector/" in file: + module_name = re.sub( + r".*/solace_ai_connector/", + "solace_ai_connector/", + file, + ) + else: + # This does assume that the plugin is conforming to + # the standard directory structure + module_name = re.sub(r"src/", "", file) + + module_name = re.sub(r".py$", "", module_name) + module_name = re.sub(r"/", ".", module_name) + + spec = importlib.util.spec_from_file_location(module_name, file) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + # Check if the module has an info dictionary + if hasattr(module, "info"): + yield file, module.info + + +# For each info dictionary, create the markdown documentation +current_component = "" +full_info = {} + + +def create_markdown_documentation(directory, output_dir, module_type): + components = [] + + # full_info contains all the info dictionaries. This will be used later + # to produce an AI prompt to help users create a new configuration + full_info[module_type] = [] + for file, info in find_info_dicts(directory): + # Get the base file name without the extension + name = re.sub(r".*/", "", file) + name = re.sub(r".py$", "", name) + global current_component # pylint: disable=global-statement + current_component = name + + full_info[module_type].append(info) + + # Create the markdown documentation + markdown = f"# {info['class_name']}\n\n" + markdown += f"{info['description']}\n\n" + markdown += "## Configuration Parameters\n\n" + markdown += "```yaml\n" + if module_type == "component": + markdown += "component_name: \n" + markdown += f"component_module: {name}\n" + markdown += "component_config:\n" + elif module_type == "transform": + markdown += "input_transforms:\n" + markdown += f" type: {name}\n" + for param in info["config_parameters"]: + markdown += f" {param['name']}: <{param.get('type', 'string')}>\n" + markdown += "```\n\n" + + if "config_parameters" in info and len(info["config_parameters"]) > 0: + markdown += "| Parameter | Required | Default | Description |\n" + markdown += "| --- | --- | --- | --- |\n" + for param in info["config_parameters"]: + markdown += f"| {param['name']} | {param.get('required', False)} | {param.get('default', '')} | {param['description']} |\n" + markdown += "\n" + else: + markdown += "No configuration parameters\n\n" + + if "request_schema" in info: + print(f"{name} has a request schema") + if "input_schema" in info: + fields = [] + markdown += "\n## Component Input Schema\n\n```\n" + markdown += format_json_schema(info["input_schema"], fields) + markdown += "\n```\n" + # markdown += "\n## Component Input Schema Fields\n\n" + markdown += format_fields(fields) + + if "output_schema" in info: + fields = [] + markdown += "\n\n## Component Output Schema\n\n```\n" + markdown += format_json_schema(info["output_schema"], fields) + markdown += "\n```\n" + # markdown += "\n## Component Output Schema Fields\n\n" + markdown += format_fields(fields) + + if "example_config" in info: + markdown += "\n\n## Example Configuration\n\n" + markdown += info["example_config"] + + # Write all the files into "./docs" and change the .py to .md + # The files are uniquely named without the path, so we can remove that + file = re.sub(r".*/", "", file) + file = re.sub(r".py$", ".md", file) + components.append( + { + "file": file, + "name": re.sub(r"\..*", "", file), + "description": info.get( + "short_description", info.get("description", "") + ), + } + ) + file = f"{output_dir}/{file}" + + # Write the markdown to a file + with open(file, "w", encoding="utf-8") as f: + f.write(markdown) + + markdown = "" + + # Create the component index table + + # Capitalize the type + type_title = module_type.capitalize() if isinstance(module_type, str) else "" + markdown += f"# Built-in {type_title}s\n\n" + + markdown += "| Component | Description |\n" + markdown += "| --- | --- |\n" + + # Sort the components by name + components = sorted(components, key=lambda x: x["name"]) + + for component in components: + markdown += f"| [{component['name']}]({component['file']}) | {component['description']} |\n" + + with open(f"{output_dir}/index.md", "w", encoding="utf-8") as f: + f.write(markdown) + + +def create_ai_prompt(info): + """Use the info dictionary to create an AI prompt to help users create a + new configuration. This prompt will contain all the component and transform information, + information about the purpose of the connector and an example configuration. Later, the + user will have to provide the message {input_schema, queue, topic}, and the desired + output_schema and topic. + + """ + + system_prompt = ( + "You are an assistant who will help users create a new configuration for the " + "Solace AI Event Connector. The connector is a tool that allows users to create " + "flows that process messages from a Solace event broker, generally to help interface " + "with AI based services. A typical flow will start with a message from the broker, " + "pass through a series of components and transforms, and then send the message back to " + "the broker. The components and transforms are user-configurable and can be used to " + "manipulate the message in various ways. The user will have to provide the message " + "input_schema, queue, or topic, and the desired output_schema and topic. Your job is to " + "to create an initial configuration for the user. \n" + "Make sure you use ${ENV_VARS} for any sensitive information. \n" + "Your interaction with the user will via a chat interface. Before you generate the " + "YAML configuration, you will have to ask the user for the input_schema, queue, or topic, " + "and the desired output_schema and topic. \n" + "You can ask as many questions as you need to get the information you need. Try to make " + "the conversation flow naturally and confirm the user's input if there is any ambiguity - " + "for example, if they input the schema in a mixed JSON/YAML/pseudo structure, print it " + "back out for them in a clean YAML format and get confirmation that it is correct\n" + ) + + # Read in docs/configuration.md + with open("docs/configuration.md", "r", encoding="utf-8") as f: + configuration_prompt = f.read() + + # Read in an example configuration + # with open("examples/milvus_store.yaml", "r", encoding="utf-8") as f: + # example_config = f.read() + + prompt = ( + "Here is a structure that defines all the built-in components and transforms. \n" + f"\n{yaml.dump(info, default_flow_style=False)}\n" + "\n\n" + "Here is the markdown documentation for the configuration file: \n" + f"\n{configuration_prompt}\n\n" + "Here is an example configuration: \n" + "Take special care to ensure that the data format is correct as it moves component to " + "component. input_transforms will likely need to be created to ensure that the data is " + "in the correct format for each component. \n" + "Now, you will have to ask the user for the input_schema, queue, or topic, and the desired " + "output_schema and topic. \n" + ) + + # Write out a prompts.yaml file + prompts = { + "system_prompt": system_prompt, + "prompt": prompt, + } + with open("prompts.yaml", "w", encoding="utf-8") as f: + f.write(yaml.dump(prompts, default_style=">", default_flow_style=True)) + + print(prompts["system_prompt"]) + print(prompts["prompt"]) + with open("prompts.txt", "w", encoding="utf-8") as f: + f.write(prompts["system_prompt"]) + f.write(prompts["prompt"]) + + +def format_json_schema( + schema_dict, field_list, level=0, first_line_string="", prop_path="" +): + indent = " " * level + output = "" + if schema_dict is None: + print(f"Schema is None for {current_component}") + return "" + if "type" not in schema_dict: + print(f"Missing type in schema: {schema_dict} for {current_component}") + return "" + if schema_dict["type"] == "object": + # output += f"{indent}{{{first_line_string}\n" + output += f"{indent}{{{first_line_string}\n" + required = schema_dict.get("required", []) + for prop_name, prop_data in schema_dict.get("properties", {}).items(): + field_list.append( + { + "name": prop_path + "." + prop_name if prop_path else prop_name, + "required": prop_name in required, + "description": prop_data.get("description", ""), + "data": prop_data, + } + ) + output += f"{indent} {prop_name}: " + output += format_json_schema( + prop_data, + field_list, + level + 1, + "", + prop_path + f".{prop_name}" if prop_path else prop_name, + ) + # If not the last property, add a comma + if prop_name != list(schema_dict["properties"].keys())[-1]: + output += "," + output += "\n" + # If there were no properties, add to indicate that any object is allowed + if not schema_dict.get("properties"): + output += f"{indent} \n" + output += f"{indent}}}" + elif schema_dict["type"] == "array": + # output += f"{indent}[{first_line_string}\n" + output += f"[{first_line_string}\n" + output += format_json_schema( + schema_dict.get("items"), field_list, level + 1, "", prop_path + "[]" + ) + output += f",\n{indent} ...\n" + output += f"{indent}]" + else: + output += f"{indent}<{schema_dict['type']}>" + + return output + + +def format_fields(fields): + if not fields or len(fields) == 0: + return "" + # Put the fields in a markdown table + output = "| Field | Required | Description |\n" + output += "| --- | --- | --- |\n" + for field in fields: + output += ( + f"| {field['name']} | {field['required']} | {field['description']} |\n" + ) + return output + + +def format_response_schema_for_markdown(response_schema): + """ + Converts a response schema dictionary into a Markdown-formatted string. + + Args: + response_schema (dict): The response schema dictionary. + + Returns: + str: A Markdown-formatted string representing the schema. + """ + + def recursive_markdown(data, level=0): + """Recursively builds the Markdown.""" + lines = [] + indent = " " * level + + if data["type"] == "object": + lines.append(f"{indent}" "{") + for prop_name, prop_data in data.get("properties", {}).items(): + if prop_data.get("type", "invalid") == "object": + lines.append(f"{indent} {prop_name}:") + lines.extend(recursive_markdown(prop_data, level + 2)) + lines.append(f"{indent} {prop_name}:") + lines.extend(recursive_markdown(prop_data, level + 2)) + lines.append(f"{indent}" "}") + + elif data["type"] == "array": + lines.append(f"{indent}* **Array of:**") + lines.extend(recursive_markdown(data.get("items"), level + 1)) + + else: # Base type + lines.append(f"{indent}* **{data['type']}**") + + if "required" in data: + lines.append(f"{indent}_(Required fields: {', '.join(data['required'])})_") + + return lines + + # Start the Markdown output + output = "```json\n" + output += json.dumps(response_schema, indent=2) # Pretty-print JSON + output += "\n```\n\n" + + # Add formatted description using the recursive helper + output += "**Detailed Schema Description**\n\n" + output += "\n".join(recursive_markdown(response_schema)) + + return output + + +# Example schema: +# "output_schema": { +# "type": "object", +# "properties": { +# "results": { +# "type": "object", +# "properties": { +# "matches": { +# "type": "array", +# "items": { +# "type": "object", +# "properties": { +# "text": {"type": "string"}, +# "metadata": {"type": "object"}, +# "score": {"type": "float"}, +# }, +# "required": ["text"], +# }, +# }, +# }, +# } +# }, +# "required": ["results"], +# }, + + +def schema_as_human_readable_string(schema): + if schema["type"] == "object": + return schema_as_human_readable_string(schema["properties"]) + elif schema["type"] == "array": + return schema_as_human_readable_string(schema["items"]) + else: + return schema["type"] + + +def print_usage(): + # Get the basename of the script (remove dirs) + name = os.path.basename(sys.argv[0]) + print(f"Usage: {name} [base_directory]") + + +def main(): + # Get a base directory from the command line + if len(sys.argv) > 1: + base_dir = sys.argv[1] + elif not os.path.exists("src/solace_ai_connector"): + if glob.glob("src/*/components"): + base_dir = "." + else: + print("You must specify a base directory for the components\n") + print_usage() + else: + base_dir = "src/solace_ai_connector" + + # Call the function + create_markdown_documentation( + f"{base_dir}/components", "docs/components", "component" + ) + create_markdown_documentation( + f"{base_dir}/transforms", "docs/transforms", "transform" + ) + + # create_ai_prompt(full_info) + + +if __name__ == "__main__": + main() diff --git a/src/tools/gen_component_docs.py b/src/tools/gen_component_docs.py index 2556a308..3a215cb1 100644 --- a/src/tools/gen_component_docs.py +++ b/src/tools/gen_component_docs.py @@ -2,6 +2,7 @@ import re import sys import json +import glob import importlib.util import yaml # pylint: disable=import-error @@ -368,16 +369,35 @@ def schema_as_human_readable_string(schema): return schema["type"] +def print_usage(): + # Get the basename of the script (remove dirs) + name = os.path.basename(sys.argv[0]) + print(f"Usage: {name} [base_directory]") + + def main(): + # Get a base directory from the command line + if len(sys.argv) > 1: + base_dir = sys.argv[1] + elif not os.path.exists("src/solace_ai_connector"): + if glob.glob("src/*/components"): + base_dir = "." + else: + print("You must specify a base directory for the components\n") + print_usage() + sys.exit(1) + else: + base_dir = "src/solace_ai_connector" + # Call the function create_markdown_documentation( - "src/solace_ai_connector/components", "docs/components", "component" + f"{base_dir}/components", "docs/components", "component" ) create_markdown_documentation( - "src/solace_ai_connector/transforms", "docs/transforms", "transform" + f"{base_dir}/transforms", "docs/transforms", "transform" ) - create_ai_prompt(full_info) + # create_ai_prompt(full_info) if __name__ == "__main__": diff --git a/tests/test_invoke.py b/tests/test_invoke.py index 80e2ceaa..e62d0303 100644 --- a/tests/test_invoke.py +++ b/tests/test_invoke.py @@ -339,7 +339,7 @@ def test_resolve_config_values(test): # Test the resolve_config_values function with a missing module def test_resolve_config_values_missing_module(): - with pytest.raises(ImportError, match="Could not import module 'missing_module'"): + with pytest.raises(ImportError, match="Module 'missing_module' not found"): resolve_config_values( { "a": { diff --git a/tests/test_message_get_set_data.py b/tests/test_message_get_set_data.py index d525b486..f2258d1f 100644 --- a/tests/test_message_get_set_data.py +++ b/tests/test_message_get_set_data.py @@ -427,6 +427,6 @@ def test_get_set_user_properties(): def test_get_set_previous(): """Test getting and setting the previous data of a message""" message = Message(payload=payloads["simple"]) - assert message.get_previous() == None + assert message.get_previous() is None message.set_previous(payloads["complex"]) assert message.get_previous() == payloads["complex"]