Skip to content

feat: IBM watsonx.data Destination connector#428

Merged
mpolomdeepsense merged 55 commits intomainfrom
feat/ibm-watsonx-data-connector
Mar 25, 2025
Merged

feat: IBM watsonx.data Destination connector#428
mpolomdeepsense merged 55 commits intomainfrom
feat/ibm-watsonx-data-connector

Conversation

@mpolomdeepsense
Copy link
Copy Markdown
Contributor

No description provided.

@mpolomdeepsense mpolomdeepsense changed the title feat: IBM watsonx.data Uploader connector feat: IBM watsonx.data Destination connector Mar 18, 2025
Copy link
Copy Markdown
Contributor

@nikpocuca nikpocuca left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very nice

@potter-potter
Copy link
Copy Markdown
Contributor

I'll check this out later today.

@mpolomdeepsense
Copy link
Copy Markdown
Contributor Author

mpolomdeepsense commented Mar 20, 2025

@rbiseck3

Is ibm_watson sufficient? I feel like ibm_watson_data is overly verbose and redundant.

Changed to ibm_watsonx

We should also drop all of the dependency updates from this PR and isolate it to just the new watson file added.

Changed, only updating ibm-watsonx and test dependencies

I'm assuming this new connector wasn't registered, when I run the cli I don't see watson listed under the destination connectors.

Not sure why you are not seeing it, I'm able to make CLI calls. I registered it inside setup.py and connectors __init__.py

Comment thread unstructured_ingest/v2/processes/connectors/ibm_watsonx.py
"uri": self.iceberg_url,
"token": bearer_token,
"warehouse": self.catalog,
"s3.endpoint": self.object_storage_url,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is ibm watson always backed by s3? Is this something we want configurable for other blob stores from other cloud providers if that's supported?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can make it use different type of storage but from what I understand the specific example they sent us and they need implementation for is using S3.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we should follow the same pattern we have for other connectors that can be backed by various blob stores, so we should have an s3 suffix for this one and create a method, i.e. def get catalog_configs()-> dict that gets called to generate this on a base class. For now, we don't need to introduce this base class, but at least add that method so if we introduce other blob stores, we could easily support those. Take a look at databricks volumes for an example of a connector supporting multiple blob stores.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it could even be more precise ibm-watsonx-iceberg-s3, but let's stick with ibm-watsonx-s3 for now, we can always change it later.

Comment thread unstructured_ingest/v2/processes/connectors/ibm_watsonx.py
Comment thread unstructured_ingest/v2/processes/connectors/ibm_watsonx.py Outdated
def _upload_data_table(table: "Table", data_table: "ArrowTable", file_data: FileData):
try:
with table.transaction() as transaction:
self._delete(transaction, file_data.identifier)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want to break this up into a delete transaction and an append transaction where we wrap each in their own tenacity retry loop. Otherwise this is going to retry the delete each time even if that already passed or doesn't apply.

Copy link
Copy Markdown
Contributor Author

@mpolomdeepsense mpolomdeepsense Mar 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a bad idea. I ran some tests and:

  1. Because of the way Iceberg works this is going to create a lot of retries.
  2. Tried partitioning and uploading 4 files with separate transactions and it fails (on delete) when using max_retries=5 for 1 file (so 3 out of 4 files got uploaded). Worked after changing it to max_retries=10
  3. Because of so many retries it makes this uploader very slow.
  4. Why even make transaction? Isn't the point of transaction to rollback the changes in case of a fail? After split we have two single separate operations that shouldn't require transaction.

Comment thread unstructured_ingest/v2/processes/connectors/ibm_watsonx.py Outdated
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants