Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add DMS Serverless Operators #43988

Merged
merged 16 commits into from
Dec 17, 2024
Merged

Add DMS Serverless Operators #43988

merged 16 commits into from
Dec 17, 2024

Conversation

ellisms
Copy link
Contributor

@ellisms ellisms commented Nov 13, 2024


Adding operators, waiters, and triggers to support AWS DMS Serverless replications.
closes #39954

@eladkal
Copy link
Contributor

eladkal commented Nov 28, 2024

The current error in the CI is a valid one:
FAILED tests/always/test_project_structure.py::TestAmazonProviderProjectStructure::test_missing_examples - Failed: Not all classes are covered with example dags. Update self.MISSING_EXAMPLES_FOR_CLASSES if you want to skip this error

You'll need to add example dags or exclude if there is a reason

@ellisms
Copy link
Contributor Author

ellisms commented Dec 11, 2024

Added the missing example.

@eladkal
Copy link
Contributor

eladkal commented Dec 11, 2024

Test fail :(

tests/always/test_example_dags.py::test_should_be_importable[providers/tests/system/amazon/aws/example_dms_serverless.py] - AssertionError: import_errors={'/opt/airflow/providers/tests/system/amazon/aws/example_dms_serverless.py': 'Traceback (most recent call last):\n  File "<frozen importlib._bootstrap>", line 228, in _call_with_frames_removed\n  File "/opt/airflow/providers/tests/system/amazon/aws/example_dms_serverless.py", line 472, in <module>\n    from tests.system.utils.watcher import watcher\nModuleNotFoundError: No module named \'tests.system.utils\'\n'}
assert 1 == 0
 +  where 1 = len({'/opt/airflow/providers/tests/system/amazon/aws/example_dms_serverless.py': 'Traceback (most recent call last):\n  Fi...e>\n    from tests.system.utils.watcher import watcher\nModuleNotFoundError: No module named \'tests.system.utils\'\n'})
 +    where {'/opt/airflow/providers/tests/system/amazon/aws/example_dms_serverless.py': 'Traceback (most recent call last):\n  Fi...e>\n    from tests.system.utils.watcher import watcher\nModuleNotFoundError: No module named \'tests.system.utils\'\n'} = <airflow.models.dagbag.DagBag object at 0x7f0814d88190>.import_errors

Copy link
Contributor

@eladkal eladkal left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@eladkal eladkal merged commit ce4236f into apache:main Dec 17, 2024
48 checks passed
@jscheffl
Copy link
Contributor

It seems the merge of this PR broke tests on main/canary builds:
https://github.com/apache/airflow/actions/runs/12380031516/job/34555973915

Somebody having an idea how to fix? Shall we revert?

@jscheffl
Copy link
Contributor

Okay, I thought I make a "quick fix" and add aiobotocore as dependency... but there is a bit of history and even as pre-commit check NOT to add this.
So loading/testing it seems need to be selective.

@potiuk
Copy link
Member

potiuk commented Dec 17, 2024

Fix merged in #45013

"MaxCapacityUnits": 4,
"MinCapacityUnits": 1,
"MultiAZ": False,
"ReplicationSubnetGroupId": "default",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test fails with the following stack trace:

Traceback (most recent call last):
  File "/opt/airflow/providers/src/airflow/providers/amazon/aws/hooks/dms.py", line 281, in create_replication_config
    resp = self.conn.create_replication_config(
  File "/usr/local/lib/python3.9/site-packages/botocore/client.py", line 569, in _api_call
    return self._make_api_call(operation_name, kwargs)
  File "/usr/local/lib/python3.9/site-packages/botocore/client.py", line 1023, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (InvalidParameterValueException) when calling the CreateReplicationConfig operation: The provided value 'default' for field 'ReplicationSubnetGroupId' does not exist. Please verify the replication subnet group exists

Does it perhaps need a step to create (and later delete) a replication subnet group? (using this)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Odd, let me look into it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any luck?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I'm having a problem running the DAG in Breeze. It seems the xcoms aren't being stored from tasks such as get_vpc_id, so when subsequent tasks try to use those values, they are missing. Even though the logs say valid values are returned. Haven't been able to track down what's going on.

Copy link
Contributor

@eladkal eladkal Jan 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not stored at all or stored with different id?
The task in question is wrapped with task group but the xcom pull is missing the group id
https://stackoverflow.com/a/72137722/14624409

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's being stored. When I view the get_default_vpc_id task in the Airflow UI, the Xcom table says "No Xcom". I confirmed the Xcom table in the databse is empty. The task log, however, shows the return value I expect:
{"logger":"airflow.task.operators.airflow.decorators.python._PythonDecoratedOperator","timestamp":"2025-01-07T19:21:51.381738","event":"Done. Returned value was: vpc-067c44fffb613cda6","level":"info"}

I didn't encounter this when I initially made the PR - everything worked. Has something changed?

LefterisXefteris pushed a commit to LefterisXefteris/airflow that referenced this pull request Jan 5, 2025
* Adding DMS serverless operators

---------

Co-authored-by: Niko Oliveira <[email protected]>
Co-authored-by: mse139 <[email protected]>
Co-authored-by: Elad Kalif <[email protected]>
got686-yandex pushed a commit to got686-yandex/airflow that referenced this pull request Jan 30, 2025
* Adding DMS serverless operators

---------

Co-authored-by: Niko Oliveira <[email protected]>
Co-authored-by: mse139 <[email protected]>
Co-authored-by: Elad Kalif <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Adding DMS serverless options to aws provider
7 participants