From 3eab2e8bd34aaa99f33509145257a5a97c49219d Mon Sep 17 00:00:00 2001 From: Maksim Moiseenkov Date: Fri, 27 Dec 2024 14:25:08 +0000 Subject: [PATCH] Implement AlloyDB create/update/delete user and backups operators --- .../operators/cloud/alloy_db.rst | 89 ++ .../providers/google/cloud/hooks/alloy_db.py | 429 ++++++- .../providers/google/cloud/links/alloy_db.py | 46 + .../google/cloud/operators/alloy_db.py | 599 +++++++++- .../airflow/providers/google/provider.yaml | 2 + .../tests/google/cloud/hooks/test_alloy_db.py | 323 +++++- .../tests/google/cloud/links/test_alloy_db.py | 38 +- .../google/cloud/operators/test_alloy_db.py | 1013 +++++++++++++++++ .../google/cloud/alloy_db/example_alloy_db.py | 99 ++ 9 files changed, 2633 insertions(+), 5 deletions(-) diff --git a/docs/apache-airflow-providers-google/operators/cloud/alloy_db.rst b/docs/apache-airflow-providers-google/operators/cloud/alloy_db.rst index c0f826ffa588c..637ebf8e757d4 100644 --- a/docs/apache-airflow-providers-google/operators/cloud/alloy_db.rst +++ b/docs/apache-airflow-providers-google/operators/cloud/alloy_db.rst @@ -113,3 +113,92 @@ To delete an AlloyDB instance you can use :dedent: 4 :start-after: [START howto_operator_alloy_db_delete_instance] :end-before: [END howto_operator_alloy_db_delete_instance] + +.. _howto/operator:AlloyDBCreateUserOperator: + +Create user +""""""""""" + +To create an AlloyDB user you can use +:class:`~airflow.providers.google.cloud.operators.alloy_db.AlloyDBCreateUserOperator`. Note that the primary instance +must be created in the cluster + +.. exampleinclude:: /../../providers/tests/system/google/cloud/alloy_db/example_alloy_db.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_alloy_db_create_user] + :end-before: [END howto_operator_alloy_db_create_user] + + +.. _howto/operator:AlloyDBUpdateUserOperator: + +Update user +""""""""""" + +To update an AlloyDB user you can use +:class:`~airflow.providers.google.cloud.operators.alloy_db.AlloyDBUpdateUserOperator`. + +.. exampleinclude:: /../../providers/tests/system/google/cloud/alloy_db/example_alloy_db.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_alloy_db_update_user] + :end-before: [END howto_operator_alloy_db_update_user] + + +.. _howto/operator:AlloyDBDeleteUserOperator: + +Delete user +""""""""""" + +To delete an AlloyDB user you can use +:class:`~airflow.providers.google.cloud.operators.alloy_db.AlloyDBDeleteUserOperator`. + +.. exampleinclude:: /../../providers/tests/system/google/cloud/alloy_db/example_alloy_db.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_alloy_db_delete_user] + :end-before: [END howto_operator_alloy_db_delete_user] + +.. _howto/operator:AlloyDBCreateBackupOperator: + +Create backup +""""""""""""" + +To create an AlloyDB backup you can use +:class:`~airflow.providers.google.cloud.operators.alloy_db.AlloyDBCreateBackupOperator`. + +.. exampleinclude:: /../../providers/tests/system/google/cloud/alloy_db/example_alloy_db.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_alloy_db_create_backup] + :end-before: [END howto_operator_alloy_db_create_backup] + + +.. _howto/operator:AlloyDBUpdateBackupOperator: + +Update backup +""""""""""""" + +To update an AlloyDB backup you can use +:class:`~airflow.providers.google.cloud.operators.alloy_db.AlloyDBUpdateBackupOperator`. + +.. exampleinclude:: /../../providers/tests/system/google/cloud/alloy_db/example_alloy_db.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_alloy_db_update_backup] + :end-before: [END howto_operator_alloy_db_update_backup] + + +.. _howto/operator:AlloyDBDeleteBackupOperator: + +Delete backup +""""""""""""" + +To delete an AlloyDB backup you can use +:class:`~airflow.providers.google.cloud.operators.alloy_db.AlloyDBDeleteBackupOperator`. + +.. exampleinclude:: /../../providers/tests/system/google/cloud/alloy_db/example_alloy_db.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_alloy_db_delete_backup] + :end-before: [END howto_operator_alloy_db_delete_backup] diff --git a/providers/src/airflow/providers/google/cloud/hooks/alloy_db.py b/providers/src/airflow/providers/google/cloud/hooks/alloy_db.py index 6da6e706de473..38a1b622e84fa 100644 --- a/providers/src/airflow/providers/google/cloud/hooks/alloy_db.py +++ b/providers/src/airflow/providers/google/cloud/hooks/alloy_db.py @@ -23,6 +23,7 @@ from copy import deepcopy from typing import TYPE_CHECKING +import tenacity from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault from google.cloud import alloydb_v1 @@ -171,6 +172,11 @@ def create_secondary_cluster( metadata=metadata, ) + @tenacity.retry( + stop=tenacity.stop_after_attempt(5), + wait=tenacity.wait_exponential(multiplier=1, max=10), + retry=tenacity.retry_if_exception_type(ValueError), + ) @GoogleBaseHook.fallback_to_default_project_id def get_cluster( self, @@ -433,6 +439,11 @@ def create_secondary_instance( metadata=metadata, ) + @tenacity.retry( + stop=tenacity.stop_after_attempt(5), + wait=tenacity.wait_exponential(multiplier=1, max=10), + retry=tenacity.retry_if_exception_type(ValueError), + ) @GoogleBaseHook.fallback_to_default_project_id def get_instance( self, @@ -492,7 +503,7 @@ def update_instance( :param cluster_id: Required. ID of the cluster. :param instance_id: Required. ID of the cluster to update. - :param instance: Required. Cluster to create. For more details please see API documentation: + :param instance: Required. Cluster to update. For more details please see API documentation: https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.Instance :param location: Required. The ID of the Google Cloud region where the cluster is located. :param update_mask: Optional. Field mask is used to specify the fields to be overwritten in the @@ -587,3 +598,419 @@ def delete_instance( timeout=timeout, metadata=metadata, ) + + @GoogleBaseHook.fallback_to_default_project_id + def create_user( + self, + user_id: str, + user: alloydb_v1.User | dict, + cluster_id: str, + location: str, + project_id: str = PROVIDE_PROJECT_ID, + request_id: str | None = None, + validate_only: bool = False, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> alloydb_v1.User: + """ + Create a user in a given Alloy DB cluster. + + .. seealso:: + For more details see API documentation: + https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.CreateUserRequest + + :param user_id: Required. ID of the user to create. + :param user: Required. The user to create. For more details please see API documentation: + https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.User + :param cluster_id: Required. ID of the cluster for creating a user in. + :param location: Required. The ID of the Google Cloud region where the cluster is located. + :param project_id: Optional. The ID of the Google Cloud project where the cluster is located. + :param request_id: Optional. An optional request ID to identify requests. Specify a unique request ID + so that if you must retry your request, the server ignores the request if it has already been + completed. The server guarantees that for at least 60 minutes since the first request. + For example, consider a situation where you make an initial request and the request times out. + If you make the request again with the same request ID, the server can check if the original operation + with the same request ID was received, and if so, ignores the second request. + This prevents clients from accidentally creating duplicate commitments. + The request ID must be a valid UUID with the exception that zero UUID is not supported + (00000000-0000-0000-0000-000000000000). + :param validate_only: Optional. If set, performs request validation, but does not actually execute + the create request. + :param retry: Optional. Designation of what errors, if any, should be retried. + :param timeout: Optional. The timeout for this request. + :param metadata: Optional. Strings which should be sent along with the request as metadata. + """ + client = self.get_alloy_db_admin_client() + return client.create_user( + request={ + "parent": client.cluster_path(project_id, location, cluster_id), + "user_id": user_id, + "user": user, + "request_id": request_id, + "validate_only": validate_only, + }, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + + @tenacity.retry( + stop=tenacity.stop_after_attempt(5), + wait=tenacity.wait_exponential(multiplier=1, max=10), + retry=tenacity.retry_if_exception_type(ValueError), + ) + @GoogleBaseHook.fallback_to_default_project_id + def get_user( + self, + user_id: str, + cluster_id: str, + location: str, + project_id: str = PROVIDE_PROJECT_ID, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> alloydb_v1.User: + """ + Get a user in a given Alloy DB cluster. + + .. seealso:: + For more details see API documentation: + https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.GetUserRequest + + :param user_id: Required. ID of the user to create. + :param cluster_id: Required. ID of the cluster for creating a user in. + :param location: Required. The ID of the Google Cloud region where the cluster is located. + :param project_id: Optional. The ID of the Google Cloud project where the cluster is located. + :param retry: Optional. Designation of what errors, if any, should be retried. + :param timeout: Optional. The timeout for this request. + :param metadata: Optional. Strings which should be sent along with the request as metadata. + """ + client = self.get_alloy_db_admin_client() + return client.get_user( + request={ + "name": client.user_path(project_id, location, cluster_id, user_id), + }, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + + @GoogleBaseHook.fallback_to_default_project_id + def update_user( + self, + cluster_id: str, + user_id: str, + user: alloydb_v1.User | dict, + location: str, + update_mask: FieldMask | dict | None = None, + allow_missing: bool = False, + project_id: str = PROVIDE_PROJECT_ID, + request_id: str | None = None, + validate_only: bool = False, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> alloydb_v1.User: + """ + Update an Alloy DB user. + + .. seealso:: + For more details see API documentation: + https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.UpdateUserRequest + + :param cluster_id: Required. ID of the cluster. + :param user_id: Required. ID of the user to update. + :param user: Required. User to update. For more details please see API documentation: + https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.User + :param location: Required. The ID of the Google Cloud region where the cluster is located. + :param update_mask: Optional. Field mask is used to specify the fields to be overwritten in the + User resource by the update. + :param request_id: Optional. The ID of an existing request object.:param request_id: Optional. An optional request ID to identify requests. Specify a unique request ID + so that if you must retry your request, the server ignores the request if it has already been + completed. The server guarantees that for at least 60 minutes since the first request. + For example, consider a situation where you make an initial request and the request times out. + If you make the request again with the same request ID, the server can check if the original operation + with the same request ID was received, and if so, ignores the second request. + This prevents clients from accidentally creating duplicate commitments. + The request ID must be a valid UUID with the exception that zero UUID is not supported + (00000000-0000-0000-0000-000000000000). + :param validate_only: Optional. If set, performs request validation, but does not actually execute + the create request. + :param allow_missing: Optional. If set to true, update succeeds even if cluster is not found. + In that case, a new cluster is created and update_mask is ignored. + :param project_id: Optional. The ID of the Google Cloud project where the cluster is located. + :param retry: Optional. Designation of what errors, if any, should be retried. + :param timeout: Optional. The timeout for this request. + :param metadata: Optional. Strings which should be sent along with the request as metadata. + """ + client = self.get_alloy_db_admin_client() + _user = deepcopy(user) if isinstance(user, dict) else alloydb_v1.User.to_dict(user) + _user["name"] = client.user_path(project_id, location, cluster_id, user_id) + return client.update_user( + request={ + "update_mask": update_mask, + "user": _user, + "request_id": request_id, + "validate_only": validate_only, + "allow_missing": allow_missing, + }, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + + @GoogleBaseHook.fallback_to_default_project_id + def delete_user( + self, + user_id: str, + cluster_id: str, + location: str, + project_id: str = PROVIDE_PROJECT_ID, + request_id: str | None = None, + validate_only: bool = False, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ): + """ + Delete an Alloy DB user. + + .. seealso:: + For more details see API documentation: + https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.DeleteUserRequest + + :param user_id: Required. ID of the user to delete. + :param cluster_id: Required. ID of the cluster. + :param location: Required. The ID of the Google Cloud region where the instance is located. + :param project_id: Optional. The ID of the Google Cloud project where the instance is located. + :param request_id: Optional. An optional request ID to identify requests. Specify a unique request ID + so that if you must retry your request, the server ignores the request if it has already been + completed. The server guarantees that for at least 60 minutes since the first request. + For example, consider a situation where you make an initial request and the request times out. + If you make the request again with the same request ID, the server can check if the original operation + with the same request ID was received, and if so, ignores the second request. + This prevents clients from accidentally creating duplicate commitments. + The request ID must be a valid UUID with the exception that zero UUID is not supported + (00000000-0000-0000-0000-000000000000). + :param validate_only: Optional. If set, performs request validation, but does not actually execute + the delete request. + :param retry: Optional. Designation of what errors, if any, should be retried. + :param timeout: Optional. The timeout for this request. + :param metadata: Optional. Strings which should be sent along with the request as metadata. + """ + client = self.get_alloy_db_admin_client() + return client.delete_user( + request={ + "name": client.user_path(project_id, location, cluster_id, user_id), + "request_id": request_id, + "validate_only": validate_only, + }, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + + @GoogleBaseHook.fallback_to_default_project_id + def create_backup( + self, + backup_id: str, + backup: alloydb_v1.Backup | dict, + location: str, + project_id: str = PROVIDE_PROJECT_ID, + request_id: str | None = None, + validate_only: bool = False, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> Operation: + """ + Create a backup in a given Alloy DB cluster. + + .. seealso:: + For more details see API documentation: + https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.CreateBackupRequest + + :param backup_id: Required. ID of the backup to create. + :param backup: Required. The backup to create. For more details please see API documentation: + https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.Backup + :param location: Required. The ID of the Google Cloud region where the cluster is located. + :param project_id: Optional. The ID of the Google Cloud project where the cluster is located. + :param request_id: Optional. An optional request ID to identify requests. Specify a unique request ID + so that if you must retry your request, the server ignores the request if it has already been + completed. The server guarantees that for at least 60 minutes since the first request. + For example, consider a situation where you make an initial request and the request times out. + If you make the request again with the same request ID, the server can check if the original operation + with the same request ID was received, and if so, ignores the second request. + This prevents clients from accidentally creating duplicate commitments. + The request ID must be a valid UUID with the exception that zero UUID is not supported + (00000000-0000-0000-0000-000000000000). + :param validate_only: Optional. If set, performs request validation, but does not actually execute + the create request. + :param retry: Optional. Designation of what errors, if any, should be retried. + :param timeout: Optional. The timeout for this request. + :param metadata: Optional. Strings which should be sent along with the request as metadata. + """ + client = self.get_alloy_db_admin_client() + return client.create_backup( + request={ + "parent": client.common_location_path(project_id, location), + "backup_id": backup_id, + "backup": backup, + "request_id": request_id, + "validate_only": validate_only, + }, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + + @tenacity.retry( + stop=tenacity.stop_after_attempt(5), + wait=tenacity.wait_exponential(multiplier=1, max=10), + retry=tenacity.retry_if_exception_type(ValueError), + ) + @GoogleBaseHook.fallback_to_default_project_id + def get_backup( + self, + backup_id: str, + location: str, + project_id: str = PROVIDE_PROJECT_ID, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> alloydb_v1.Backup: + """ + Get a backup in a given Alloy DB cluster. + + .. seealso:: + For more details see API documentation: + https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.GetBackupRequest + + :param backup_id: Required. ID of the backup to create. + :param location: Required. The ID of the Google Cloud region where the cluster is located. + :param project_id: Optional. The ID of the Google Cloud project where the cluster is located. + :param retry: Optional. Designation of what errors, if any, should be retried. + :param timeout: Optional. The timeout for this request. + :param metadata: Optional. Strings which should be sent along with the request as metadata. + """ + client = self.get_alloy_db_admin_client() + return client.get_backup( + request={ + "name": client.backup_path(project_id, location, backup_id), + }, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + + @GoogleBaseHook.fallback_to_default_project_id + def update_backup( + self, + backup_id: str, + backup: alloydb_v1.Backup | dict, + location: str, + update_mask: FieldMask | dict | None = None, + allow_missing: bool = False, + project_id: str = PROVIDE_PROJECT_ID, + request_id: str | None = None, + validate_only: bool = False, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ) -> Operation: + """ + Update an Alloy DB backup. + + .. seealso:: + For more details see API documentation: + https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.UpdateBackupRequest + + :param backup_id: Required. ID of the backup to update. + :param backup: Required. Backup to update. For more details please see API documentation: + https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.Backup + :param location: Required. The ID of the Google Cloud region where the cluster is located. + :param update_mask: Optional. Field mask is used to specify the fields to be overwritten in the + Backup resource by the update. + :param request_id: Optional. The ID of an existing request object.:param request_id: Optional. An optional request ID to identify requests. Specify a unique request ID + so that if you must retry your request, the server ignores the request if it has already been + completed. The server guarantees that for at least 60 minutes since the first request. + For example, consider a situation where you make an initial request and the request times out. + If you make the request again with the same request ID, the server can check if the original operation + with the same request ID was received, and if so, ignores the second request. + This prevents clients from accidentally creating duplicate commitments. + The request ID must be a valid UUID with the exception that zero UUID is not supported + (00000000-0000-0000-0000-000000000000). + :param validate_only: Optional. If set, performs request validation, but does not actually execute + the create request. + :param allow_missing: Optional. If set to true, update succeeds even if cluster is not found. + In that case, a new cluster is created and update_mask is ignored. + :param project_id: Optional. The ID of the Google Cloud project where the cluster is located. + :param retry: Optional. Designation of what errors, if any, should be retried. + :param timeout: Optional. The timeout for this request. + :param metadata: Optional. Strings which should be sent along with the request as metadata. + """ + client = self.get_alloy_db_admin_client() + _backup = deepcopy(backup) if isinstance(backup, dict) else alloydb_v1.Backup.to_dict(backup) + _backup["name"] = client.backup_path(project_id, location, backup_id) + return client.update_backup( + request={ + "update_mask": update_mask, + "backup": _backup, + "request_id": request_id, + "validate_only": validate_only, + "allow_missing": allow_missing, + }, + retry=retry, + timeout=timeout, + metadata=metadata, + ) + + @GoogleBaseHook.fallback_to_default_project_id + def delete_backup( + self, + backup_id: str, + location: str, + project_id: str = PROVIDE_PROJECT_ID, + request_id: str | None = None, + validate_only: bool = False, + retry: Retry | _MethodDefault = DEFAULT, + timeout: float | None = None, + metadata: Sequence[tuple[str, str]] = (), + ): + """ + Delete an Alloy DB backup. + + .. seealso:: + For more details see API documentation: + https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.DeleteBackupRequest + + :param backup_id: Required. ID of the backup to delete. + :param location: Required. The ID of the Google Cloud region where the instance is located. + :param project_id: Optional. The ID of the Google Cloud project where the instance is located. + :param request_id: Optional. An optional request ID to identify requests. Specify a unique request ID + so that if you must retry your request, the server ignores the request if it has already been + completed. The server guarantees that for at least 60 minutes since the first request. + For example, consider a situation where you make an initial request and the request times out. + If you make the request again with the same request ID, the server can check if the original operation + with the same request ID was received, and if so, ignores the second request. + This prevents clients from accidentally creating duplicate commitments. + The request ID must be a valid UUID with the exception that zero UUID is not supported + (00000000-0000-0000-0000-000000000000). + :param validate_only: Optional. If set, performs request validation, but does not actually execute + the delete request. + :param retry: Optional. Designation of what errors, if any, should be retried. + :param timeout: Optional. The timeout for this request. + :param metadata: Optional. Strings which should be sent along with the request as metadata. + """ + client = self.get_alloy_db_admin_client() + return client.delete_backup( + request={ + "name": client.backup_path(project_id, location, backup_id), + "request_id": request_id, + "validate_only": validate_only, + }, + retry=retry, + timeout=timeout, + metadata=metadata, + ) diff --git a/providers/src/airflow/providers/google/cloud/links/alloy_db.py b/providers/src/airflow/providers/google/cloud/links/alloy_db.py index 6b4c394a67cdd..05d9e9fa5a873 100644 --- a/providers/src/airflow/providers/google/cloud/links/alloy_db.py +++ b/providers/src/airflow/providers/google/cloud/links/alloy_db.py @@ -31,6 +31,10 @@ ALLOY_DB_CLUSTER_LINK = ( ALLOY_DB_BASE_LINK + "/locations/{location_id}/clusters/{cluster_id}?project={project_id}" ) +ALLOY_DB_USERS_LINK = ( + ALLOY_DB_BASE_LINK + "/locations/{location_id}/clusters/{cluster_id}/users?project={project_id}" +) +ALLOY_DB_BACKUPS_LINK = ALLOY_DB_BASE_LINK + "/backups?project={project_id}" class AlloyDBClusterLink(BaseGoogleLink): @@ -53,3 +57,45 @@ def persist( key=AlloyDBClusterLink.key, value={"location_id": location_id, "cluster_id": cluster_id, "project_id": project_id}, ) + + +class AlloyDBUsersLink(BaseGoogleLink): + """Helper class for constructing AlloyDB users Link.""" + + name = "AlloyDB Users" + key = "alloy_db_users" + format_str = ALLOY_DB_USERS_LINK + + @staticmethod + def persist( + context: Context, + task_instance: BaseOperator, + location_id: str, + cluster_id: str, + project_id: str | None, + ): + task_instance.xcom_push( + context, + key=AlloyDBUsersLink.key, + value={"location_id": location_id, "cluster_id": cluster_id, "project_id": project_id}, + ) + + +class AlloyDBBackupsLink(BaseGoogleLink): + """Helper class for constructing AlloyDB backups Link.""" + + name = "AlloyDB Backups" + key = "alloy_db_backups" + format_str = ALLOY_DB_BACKUPS_LINK + + @staticmethod + def persist( + context: Context, + task_instance: BaseOperator, + project_id: str | None, + ): + task_instance.xcom_push( + context, + key=AlloyDBBackupsLink.key, + value={"project_id": project_id}, + ) diff --git a/providers/src/airflow/providers/google/cloud/operators/alloy_db.py b/providers/src/airflow/providers/google/cloud/operators/alloy_db.py index bb7680b946f4a..634f97b6d9878 100644 --- a/providers/src/airflow/providers/google/cloud/operators/alloy_db.py +++ b/providers/src/airflow/providers/google/cloud/operators/alloy_db.py @@ -29,7 +29,11 @@ from airflow.exceptions import AirflowException from airflow.providers.google.cloud.hooks.alloy_db import AlloyDbHook -from airflow.providers.google.cloud.links.alloy_db import AlloyDBClusterLink +from airflow.providers.google.cloud.links.alloy_db import ( + AlloyDBBackupsLink, + AlloyDBClusterLink, + AlloyDBUsersLink, +) from airflow.providers.google.cloud.operators.cloud_base import GoogleCloudBaseOperator if TYPE_CHECKING: @@ -592,7 +596,7 @@ class AlloyDBUpdateInstanceOperator(AlloyDBWriteBaseOperator): :ref:`howto/operator:AlloyDBUpdateInstanceOperator` :param cluster_id: Required. ID of the cluster. - :param instance_id: Required. ID of the cluster to update. + :param instance_id: Required. ID of the instance to update. :param instance_configuration: Required. Instance to update. For more details please see API documentation: https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.Instance :param update_mask: Optional. Field mask is used to specify the fields to be overwritten in the @@ -774,3 +778,594 @@ def execute(self, context: Context) -> None: if not self.validate_request: self.log.info("AlloyDB instance %s was successfully removed.", self.instance_id) + + +class AlloyDBCreateUserOperator(AlloyDBWriteBaseOperator): + """ + Create a User in an Alloy DB cluster. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:AlloyDBCreateUserOperator` + + :param user_id: Required. ID of the user to create. + :param user_configuration: Required. The user to create. For more details please see API documentation: + https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.User + :param cluster_id: Required. ID of the cluster for creating a user in. + :param request_id: Optional. An optional request ID to identify requests. Specify a unique request ID + so that if you must retry your request, the server ignores the request if it has already been + completed. The server guarantees that for at least 60 minutes since the first request. + For example, consider a situation where you make an initial request and the request times out. + If you make the request again with the same request ID, the server can check if the original operation + with the same request ID was received, and if so, ignores the second request. + This prevents clients from accidentally creating duplicate commitments. + The request ID must be a valid UUID with the exception that zero UUID is not supported + (00000000-0000-0000-0000-000000000000). + :param validate_request: Optional. If set, performs request validation, but does not actually + execute the request. + :param project_id: Required. The ID of the Google Cloud project where the service is used. + :param location: Required. The ID of the Google Cloud region where the service is used. + :param gcp_conn_id: Optional. The connection ID to use to connect to Google Cloud. + :param retry: Optional. A retry object used to retry requests. If `None` is specified, requests will not + be retried. + :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete. + Note that if `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Optional. Additional metadata that is provided to the method. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields: Sequence[str] = tuple( + {"user_id", "user_configuration", "cluster_id"} | set(AlloyDBWriteBaseOperator.template_fields) + ) + operator_extra_links = (AlloyDBUsersLink(),) + + def __init__( + self, + user_id: str, + user_configuration: alloydb_v1.User | dict, + cluster_id: str, + *args, + **kwargs, + ): + super().__init__(*args, **kwargs) + self.user_id = user_id + self.user_configuration = user_configuration + self.cluster_id = cluster_id + + def _get_user(self) -> proto.Message | None: + self.log.info("Checking if the user %s exists already...", self.user_id) + try: + user = self.hook.get_user( + user_id=self.user_id, + cluster_id=self.cluster_id, + location=self.location, + project_id=self.project_id, + ) + except NotFound: + self.log.info("The user %s does not exist yet.", self.user_id) + except Exception as ex: + raise AirflowException(ex) from ex + else: + self.log.info( + "AlloyDB user %s already exists in the cluster %s.", + self.user_id, + self.cluster_id, + ) + result = alloydb_v1.User.to_dict(user) + return result + return None + + def execute(self, context: Context) -> dict | None: + AlloyDBUsersLink.persist( + context=context, + task_instance=self, + location_id=self.location, + cluster_id=self.cluster_id, + project_id=self.project_id, + ) + if (_user := self._get_user()) is not None: + return _user + + if self.validate_request: + self.log.info("Validating a Create AlloyDB user request.") + else: + self.log.info("Creating an AlloyDB user.") + + try: + user = self.hook.create_user( + user_id=self.user_id, + cluster_id=self.cluster_id, + user=self.user_configuration, + location=self.location, + project_id=self.project_id, + request_id=self.request_id, + validate_only=self.validate_request, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + except Exception as ex: + raise AirflowException(ex) + else: + result = alloydb_v1.User.to_dict(user) if not self.validate_request else None + + if not self.validate_request: + self.log.info("AlloyDB user %s was successfully created.", self.user_id) + return result + + +class AlloyDBUpdateUserOperator(AlloyDBWriteBaseOperator): + """ + Update an Alloy DB user. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:AlloyDBUpdateUserOperator` + + :param user_id: Required. The ID of the user to update. + :param cluster_id: Required. ID of the cluster. + :param user_configuration: Required. User to update. For more details please see API documentation: + https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.User + :param update_mask: Optional. Field mask is used to specify the fields to be overwritten in the + User resource by the update. + :param request_id: Optional. An optional request ID to identify requests. Specify a unique request ID + so that if you must retry your request, the server ignores the request if it has already been + completed. The server guarantees that for at least 60 minutes since the first request. + For example, consider a situation where you make an initial request and the request times out. + If you make the request again with the same request ID, the server can check if the original operation + with the same request ID was received, and if so, ignores the second request. + This prevents clients from accidentally creating duplicate commitments. + The request ID must be a valid UUID with the exception that zero UUID is not supported + (00000000-0000-0000-0000-000000000000). + :param validate_request: Optional. If set, performs request validation, but does not actually + execute the request. + :param allow_missing: Optional. If set to true, update succeeds even if instance is not found. + In that case, a new user is created and update_mask is ignored. + :param project_id: Required. The ID of the Google Cloud project where the service is used. + :param location: Required. The ID of the Google Cloud region where the service is used. + :param gcp_conn_id: Optional. The connection ID to use to connect to Google Cloud. + :param retry: Optional. A retry object used to retry requests. If `None` is specified, requests will not + be retried. + :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete. + Note that if `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Optional. Additional metadata that is provided to the method. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields: Sequence[str] = tuple( + {"cluster_id", "user_id", "user_configuration", "update_mask", "allow_missing"} + | set(AlloyDBWriteBaseOperator.template_fields) + ) + operator_extra_links = (AlloyDBUsersLink(),) + + def __init__( + self, + cluster_id: str, + user_id: str, + user_configuration: alloydb_v1.User | dict, + update_mask: FieldMask | dict | None = None, + allow_missing: bool = False, + *args, + **kwargs, + ): + super().__init__(*args, **kwargs) + self.cluster_id = cluster_id + self.user_id = user_id + self.user_configuration = user_configuration + self.update_mask = update_mask + self.allow_missing = allow_missing + + def execute(self, context: Context) -> dict | None: + AlloyDBUsersLink.persist( + context=context, + task_instance=self, + location_id=self.location, + cluster_id=self.cluster_id, + project_id=self.project_id, + ) + if self.validate_request: + self.log.info("Validating an Update AlloyDB user request.") + else: + self.log.info("Updating an AlloyDB user.") + + try: + user = self.hook.update_user( + cluster_id=self.cluster_id, + user_id=self.user_id, + project_id=self.project_id, + location=self.location, + user=self.user_configuration, + update_mask=self.update_mask, + allow_missing=self.allow_missing, + request_id=self.request_id, + validate_only=self.validate_request, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + except Exception as ex: + raise AirflowException(ex) from ex + else: + result = alloydb_v1.User.to_dict(user) if not self.validate_request else None + + if not self.validate_request: + self.log.info("AlloyDB user %s was successfully updated.", self.user_id) + return result + + +class AlloyDBDeleteUserOperator(AlloyDBWriteBaseOperator): + """ + Delete an Alloy DB user. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:AlloyDBDeleteUserOperator` + + :param user_id: Required. ID of the user to delete. + :param cluster_id: Required. ID of the cluster. + :param request_id: Optional. An optional request ID to identify requests. Specify a unique request ID + so that if you must retry your request, the server ignores the request if it has already been + completed. The server guarantees that for at least 60 minutes since the first request. + For example, consider a situation where you make an initial request and the request times out. + If you make the request again with the same request ID, the server can check if the original operation + with the same request ID was received, and if so, ignores the second request. + This prevents clients from accidentally creating duplicate commitments. + The request ID must be a valid UUID with the exception that zero UUID is not supported + (00000000-0000-0000-0000-000000000000). + :param validate_request: Optional. If set, performs request validation, but does not actually + execute the request. + :param project_id: Required. The ID of the Google Cloud project where the service is used. + :param location: Required. The ID of the Google Cloud region where the service is used. + :param gcp_conn_id: Optional. The connection ID to use to connect to Google Cloud. + :param retry: Optional. A retry object used to retry requests. If `None` is specified, requests will not + be retried. + :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete. + Note that if `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Optional. Additional metadata that is provided to the method. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields: Sequence[str] = tuple( + {"user_id", "cluster_id"} | set(AlloyDBWriteBaseOperator.template_fields) + ) + + def __init__( + self, + user_id: str, + cluster_id: str, + *args, + **kwargs, + ): + super().__init__(*args, **kwargs) + self.user_id = user_id + self.cluster_id = cluster_id + + def execute(self, context: Context) -> None: + if self.validate_request: + self.log.info("Validating a Delete AlloyDB user request.") + else: + self.log.info("Deleting an AlloyDB user.") + + try: + self.hook.delete_user( + user_id=self.user_id, + cluster_id=self.cluster_id, + project_id=self.project_id, + location=self.location, + request_id=self.request_id, + validate_only=self.validate_request, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + except Exception as ex: + raise AirflowException(ex) from ex + + if not self.validate_request: + self.log.info("AlloyDB user %s was successfully removed.", self.user_id) + + +class AlloyDBCreateBackupOperator(AlloyDBWriteBaseOperator): + """ + Create a Backup in an Alloy DB cluster. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:AlloyDBCreateBackupOperator` + + :param backup_id: Required. ID of the backup to create. + :param backup_configuration: Required. Backup to create. For more details please see API documentation: + https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.Backup + :param request_id: Optional. An optional request ID to identify requests. Specify a unique request ID + so that if you must retry your request, the server ignores the request if it has already been + completed. The server guarantees that for at least 60 minutes since the first request. + For example, consider a situation where you make an initial request and the request times out. + If you make the request again with the same request ID, the server can check if the original operation + with the same request ID was received, and if so, ignores the second request. + This prevents clients from accidentally creating duplicate commitments. + The request ID must be a valid UUID with the exception that zero UUID is not supported + (00000000-0000-0000-0000-000000000000). + :param validate_request: Optional. If set, performs request validation, but does not actually + execute the request. + :param project_id: Required. The ID of the Google Cloud project where the service is used. + :param location: Required. The ID of the Google Cloud region where the backups should be saved. + :param gcp_conn_id: Optional. The connection ID to use to connect to Google Cloud. + :param retry: Optional. A retry object used to retry requests. If `None` is specified, requests will not + be retried. + :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete. + Note that if `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Optional. Additional metadata that is provided to the method. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields: Sequence[str] = tuple( + {"backup_id", "backup_configuration"} | set(AlloyDBWriteBaseOperator.template_fields) + ) + operator_extra_links = (AlloyDBBackupsLink(),) + + def __init__( + self, + backup_id: str, + backup_configuration: alloydb_v1.Backup | dict, + *args, + **kwargs, + ): + super().__init__(*args, **kwargs) + self.backup_id = backup_id + self.backup_configuration = backup_configuration + + def _get_backup(self) -> proto.Message | None: + self.log.info("Checking if the backup %s exists already...", self.backup_id) + try: + backup = self.hook.get_backup( + backup_id=self.backup_id, + location=self.location, + project_id=self.project_id, + ) + except NotFound: + self.log.info("The backup %s does not exist yet.", self.backup_id) + except Exception as ex: + raise AirflowException(ex) from ex + else: + self.log.info("AlloyDB backup %s already exists.", self.backup_id) + result = alloydb_v1.Backup.to_dict(backup) + return result + return None + + def execute(self, context: Context) -> dict | None: + AlloyDBBackupsLink.persist( + context=context, + task_instance=self, + project_id=self.project_id, + ) + if backup := self._get_backup(): + return backup + + if self.validate_request: + self.log.info("Validating a Create AlloyDB backup request.") + else: + self.log.info("Creating an AlloyDB backup.") + + try: + operation = self.hook.create_backup( + backup_id=self.backup_id, + backup=self.backup_configuration, + location=self.location, + project_id=self.project_id, + request_id=self.request_id, + validate_only=self.validate_request, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + except Exception as ex: + raise AirflowException(ex) + else: + operation_result = self.get_operation_result(operation) + result = alloydb_v1.Backup.to_dict(operation_result) if operation_result else None + + return result + + +class AlloyDBUpdateBackupOperator(AlloyDBWriteBaseOperator): + """ + Update an Alloy DB backup. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:AlloyDBUpdateBackupOperator` + + :param backup_id: Required. ID of the backup to update. + :param backup_configuration: Required. Backup to update. For more details please see API documentation: + https://cloud.google.com/python/docs/reference/alloydb/latest/google.cloud.alloydb_v1.types.Backup + :param update_mask: Optional. Field mask is used to specify the fields to be overwritten in the + Backup resource by the update. + :param request_id: Optional. An optional request ID to identify requests. Specify a unique request ID + so that if you must retry your request, the server ignores the request if it has already been + completed. The server guarantees that for at least 60 minutes since the first request. + For example, consider a situation where you make an initial request and the request times out. + If you make the request again with the same request ID, the server can check if the original operation + with the same request ID was received, and if so, ignores the second request. + This prevents clients from accidentally creating duplicate commitments. + The request ID must be a valid UUID with the exception that zero UUID is not supported + (00000000-0000-0000-0000-000000000000). + :param validate_request: Optional. If set, performs request validation, but does not actually + execute the request. + :param allow_missing: Optional. If set to true, update succeeds even if backup is not found. + In that case, a new backup is created and update_mask is ignored. + :param project_id: Required. The ID of the Google Cloud project where the service is used. + :param location: Required. The ID of the Google Cloud region where the service is used. + :param gcp_conn_id: Optional. The connection ID to use to connect to Google Cloud. + :param retry: Optional. A retry object used to retry requests. If `None` is specified, requests will not + be retried. + :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete. + Note that if `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Optional. Additional metadata that is provided to the method. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields: Sequence[str] = tuple( + {"backup_id", "backup_configuration", "update_mask", "allow_missing"} + | set(AlloyDBWriteBaseOperator.template_fields) + ) + operator_extra_links = (AlloyDBBackupsLink(),) + + def __init__( + self, + backup_id: str, + backup_configuration: alloydb_v1.Backup | dict, + update_mask: FieldMask | dict | None = None, + allow_missing: bool = False, + *args, + **kwargs, + ): + super().__init__(*args, **kwargs) + self.backup_id = backup_id + self.backup_configuration = backup_configuration + self.update_mask = update_mask + self.allow_missing = allow_missing + + def execute(self, context: Context) -> dict | None: + AlloyDBBackupsLink.persist( + context=context, + task_instance=self, + project_id=self.project_id, + ) + if self.validate_request: + self.log.info("Validating an Update AlloyDB backup request.") + else: + self.log.info("Updating an AlloyDB backup.") + + try: + operation = self.hook.update_backup( + backup_id=self.backup_id, + project_id=self.project_id, + location=self.location, + backup=self.backup_configuration, + update_mask=self.update_mask, + allow_missing=self.allow_missing, + request_id=self.request_id, + validate_only=self.validate_request, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + except Exception as ex: + raise AirflowException(ex) from ex + else: + operation_result = self.get_operation_result(operation) + result = alloydb_v1.Backup.to_dict(operation_result) if operation_result else None + + if not self.validate_request: + self.log.info("AlloyDB backup %s was successfully updated.", self.backup_id) + return result + + +class AlloyDBDeleteBackupOperator(AlloyDBWriteBaseOperator): + """ + Delete an Alloy DB backup. + + .. seealso:: + For more information on how to use this operator, take a look at the guide: + :ref:`howto/operator:AlloyDBDeleteBackupOperator` + + :param backup_id: Required. ID of the backup to delete. + :param request_id: Optional. An optional request ID to identify requests. Specify a unique request ID + so that if you must retry your request, the server ignores the request if it has already been + completed. The server guarantees that for at least 60 minutes since the first request. + For example, consider a situation where you make an initial request and the request times out. + If you make the request again with the same request ID, the server can check if the original operation + with the same request ID was received, and if so, ignores the second request. + This prevents clients from accidentally creating duplicate commitments. + The request ID must be a valid UUID with the exception that zero UUID is not supported + (00000000-0000-0000-0000-000000000000). + :param validate_request: Optional. If set, performs request validation, but does not actually + execute the request. + :param project_id: Required. The ID of the Google Cloud project where the service is used. + :param location: Required. The ID of the Google Cloud region where the service is used. + :param gcp_conn_id: Optional. The connection ID to use to connect to Google Cloud. + :param retry: Optional. A retry object used to retry requests. If `None` is specified, requests will not + be retried. + :param timeout: Optional. The amount of time, in seconds, to wait for the request to complete. + Note that if `retry` is specified, the timeout applies to each individual attempt. + :param metadata: Optional. Additional metadata that is provided to the method. + :param impersonation_chain: Optional service account to impersonate using short-term + credentials, or chained list of accounts required to get the access_token + of the last account in the list, which will be impersonated in the request. + If set as a string, the account must grant the originating account + the Service Account Token Creator IAM role. + If set as a sequence, the identities from the list must grant + Service Account Token Creator IAM role to the directly preceding identity, with first + account from the list granting this role to the originating account (templated). + """ + + template_fields: Sequence[str] = tuple({"backup_id"} | set(AlloyDBWriteBaseOperator.template_fields)) + + def __init__( + self, + backup_id: str, + *args, + **kwargs, + ): + super().__init__(*args, **kwargs) + self.backup_id = backup_id + + def execute(self, context: Context) -> None: + if self.validate_request: + self.log.info("Validating a Delete AlloyDB backup request.") + else: + self.log.info("Deleting an AlloyDB backup.") + + try: + operation = self.hook.delete_backup( + backup_id=self.backup_id, + project_id=self.project_id, + location=self.location, + request_id=self.request_id, + validate_only=self.validate_request, + retry=self.retry, + timeout=self.timeout, + metadata=self.metadata, + ) + except Exception as ex: + raise AirflowException(ex) from ex + else: + self.get_operation_result(operation) + + if not self.validate_request: + self.log.info("AlloyDB backup %s was successfully removed.", self.backup_id) diff --git a/providers/src/airflow/providers/google/provider.yaml b/providers/src/airflow/providers/google/provider.yaml index b253967472d53..6072d617b82d9 100644 --- a/providers/src/airflow/providers/google/provider.yaml +++ b/providers/src/airflow/providers/google/provider.yaml @@ -1186,7 +1186,9 @@ connection-types: connection-type: leveldb extra-links: + - airflow.providers.google.cloud.links.alloy_db.AlloyDBBackupsLink - airflow.providers.google.cloud.links.alloy_db.AlloyDBClusterLink + - airflow.providers.google.cloud.links.alloy_db.AlloyDBUsersLink - airflow.providers.google.cloud.links.dataform.DataformRepositoryLink - airflow.providers.google.cloud.links.dataform.DataformWorkspaceLink - airflow.providers.google.cloud.links.dataform.DataformWorkflowInvocationLink diff --git a/providers/tests/google/cloud/hooks/test_alloy_db.py b/providers/tests/google/cloud/hooks/test_alloy_db.py index ad4f4dfd597de..09891b0cf8083 100644 --- a/providers/tests/google/cloud/hooks/test_alloy_db.py +++ b/providers/tests/google/cloud/hooks/test_alloy_db.py @@ -43,7 +43,12 @@ TEST_FORCE = False TEST_REQUEST_ID = "test_request_id" TEST_VALIDATE_ONLY = False - +TEST_USER_ID = "test_user" +TEST_USER: dict[str, Any] = {} +TEST_USER_NAME = f"{TEST_CLUSTER_NAME}/users/{TEST_USER_ID}" +TEST_BACKUP_ID = "test_backup_id" +TEST_BACKUP: dict[str, Any] = {} +TEST_BACKUP_NAME = f"projects/{TEST_GCP_PROJECT}/locations/{TEST_GCP_REGION}/backups/{TEST_BACKUP_ID}" TEST_RETRY = DEFAULT TEST_TIMEOUT = None TEST_METADATA = () @@ -307,3 +312,319 @@ def test_delete_cluster(self, mock_client): timeout=TEST_TIMEOUT, metadata=TEST_METADATA, ) + + @mock.patch(HOOK_PATH.format("AlloyDbHook.get_alloy_db_admin_client")) + def test_create_user(self, mock_client): + mock_create_user = mock_client.return_value.create_user + expected_result = mock_create_user.return_value + expected_parent = TEST_CLUSTER_NAME + mock_cluster_path = mock_client.return_value.cluster_path + mock_cluster_path.return_value = expected_parent + expected_request = { + "parent": expected_parent, + "user_id": TEST_USER_ID, + "user": TEST_USER, + "request_id": TEST_REQUEST_ID, + "validate_only": TEST_VALIDATE_ONLY, + } + + result = self.hook.create_user( + user_id=TEST_USER_ID, + cluster_id=TEST_CLUSTER_ID, + user=TEST_USER, + location=TEST_GCP_REGION, + project_id=TEST_GCP_PROJECT, + request_id=TEST_REQUEST_ID, + validate_only=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + + assert result == expected_result + mock_client.assert_called_once() + mock_cluster_path.assert_called_once_with(TEST_GCP_PROJECT, TEST_GCP_REGION, TEST_CLUSTER_ID) + mock_create_user.assert_called_once_with( + request=expected_request, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + + @mock.patch(HOOK_PATH.format("AlloyDbHook.get_alloy_db_admin_client")) + def test_get_user(self, mock_client): + mock_get_user = mock_client.return_value.get_user + mock_user_path = mock_client.return_value.user_path + mock_user_path.return_value = TEST_USER_NAME + expected_result = mock_get_user.return_value + + result = self.hook.get_user( + user_id=TEST_USER_ID, + cluster_id=TEST_CLUSTER_ID, + location=TEST_GCP_REGION, + project_id=TEST_GCP_PROJECT, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + + assert result == expected_result + mock_client.assert_called_once() + mock_user_path.assert_called_once_with( + TEST_GCP_PROJECT, TEST_GCP_REGION, TEST_CLUSTER_ID, TEST_USER_ID + ) + mock_get_user.assert_called_once_with( + request={"name": TEST_USER_NAME}, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + + @pytest.mark.parametrize( + "given_user, expected_user", + [ + (TEST_USER, {**deepcopy(TEST_USER), **{"name": TEST_USER_NAME}}), + (alloydb_v1.User(), {"name": TEST_USER_NAME}), + ({}, {"name": TEST_USER_NAME}), + ], + ) + @mock.patch(HOOK_PATH.format("deepcopy")) + @mock.patch(HOOK_PATH.format("alloydb_v1.User.to_dict")) + @mock.patch(HOOK_PATH.format("AlloyDbHook.get_alloy_db_admin_client")) + def test_update_user(self, mock_client, mock_to_dict, mock_deepcopy, given_user, expected_user): + mock_update_user = mock_client.return_value.update_user + expected_result = mock_update_user.return_value + mock_deepcopy.return_value = expected_user + mock_to_dict.return_value = expected_user + mock_user_path = mock_client.return_value.user_path + mock_user_path.return_value = expected_user + + expected_request = { + "update_mask": TEST_UPDATE_MASK, + "user": expected_user, + "request_id": TEST_REQUEST_ID, + "validate_only": TEST_VALIDATE_ONLY, + "allow_missing": TEST_ALLOW_MISSING, + } + + result = self.hook.update_user( + user_id=TEST_USER_ID, + cluster_id=TEST_CLUSTER_ID, + user=given_user, + location=TEST_GCP_REGION, + update_mask=TEST_UPDATE_MASK, + project_id=TEST_GCP_PROJECT, + allow_missing=TEST_ALLOW_MISSING, + request_id=TEST_REQUEST_ID, + validate_only=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + + assert result == expected_result + mock_user_path.assert_called_once_with( + TEST_GCP_PROJECT, TEST_GCP_REGION, TEST_CLUSTER_ID, TEST_USER_ID + ) + if isinstance(given_user, dict): + mock_deepcopy.assert_called_once_with(given_user) + assert not mock_to_dict.called + else: + assert not mock_deepcopy.called + mock_to_dict.assert_called_once_with(given_user) + mock_client.assert_called_once() + mock_update_user.assert_called_once_with( + request=expected_request, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + + @mock.patch(HOOK_PATH.format("AlloyDbHook.get_alloy_db_admin_client")) + def test_delete_user(self, mock_client): + mock_delete_user = mock_client.return_value.delete_user + expected_result = mock_delete_user.return_value + mock_user_path = mock_client.return_value.user_path + mock_user_path.return_value = TEST_USER_NAME + expected_request = { + "name": TEST_USER_NAME, + "request_id": TEST_REQUEST_ID, + "validate_only": TEST_VALIDATE_ONLY, + } + + result = self.hook.delete_user( + user_id=TEST_USER_ID, + cluster_id=TEST_CLUSTER_ID, + location=TEST_GCP_REGION, + project_id=TEST_GCP_PROJECT, + request_id=TEST_REQUEST_ID, + validate_only=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + + assert result == expected_result + mock_client.assert_called_once() + mock_user_path.assert_called_once_with( + TEST_GCP_PROJECT, TEST_GCP_REGION, TEST_CLUSTER_ID, TEST_USER_ID + ) + mock_delete_user.assert_called_once_with( + request=expected_request, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + + @mock.patch(HOOK_PATH.format("AlloyDbHook.get_alloy_db_admin_client")) + def test_create_backup(self, mock_client): + mock_create_backup = mock_client.return_value.create_backup + expected_result = mock_create_backup.return_value + expected_parent = f"projects/{TEST_GCP_PROJECT}/locations/{TEST_GCP_REGION}" + mock_common_location_path = mock_client.return_value.common_location_path + mock_common_location_path.return_value = expected_parent + expected_request = { + "parent": expected_parent, + "backup_id": TEST_BACKUP_ID, + "backup": TEST_BACKUP, + "request_id": TEST_REQUEST_ID, + "validate_only": TEST_VALIDATE_ONLY, + } + + result = self.hook.create_backup( + backup_id=TEST_BACKUP_ID, + backup=TEST_BACKUP, + location=TEST_GCP_REGION, + project_id=TEST_GCP_PROJECT, + request_id=TEST_REQUEST_ID, + validate_only=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + + assert result == expected_result + mock_client.assert_called_once() + mock_common_location_path.assert_called_once_with(TEST_GCP_PROJECT, TEST_GCP_REGION) + mock_create_backup.assert_called_once_with( + request=expected_request, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + + @mock.patch(HOOK_PATH.format("AlloyDbHook.get_alloy_db_admin_client")) + def test_get_backup(self, mock_client): + mock_get_backup = mock_client.return_value.get_backup + mock_cluster_backup = mock_client.return_value.backup_path + mock_cluster_backup.return_value = TEST_BACKUP_NAME + expected_result = mock_get_backup.return_value + + result = self.hook.get_backup( + backup_id=TEST_BACKUP_ID, + location=TEST_GCP_REGION, + project_id=TEST_GCP_PROJECT, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + + assert result == expected_result + mock_client.assert_called_once() + mock_cluster_backup.assert_called_once_with(TEST_GCP_PROJECT, TEST_GCP_REGION, TEST_BACKUP_ID) + mock_get_backup.assert_called_once_with( + request={"name": TEST_BACKUP_NAME}, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + + @pytest.mark.parametrize( + "given_backup, expected_backup", + [ + (TEST_BACKUP, {**deepcopy(TEST_BACKUP), **{"name": TEST_BACKUP_NAME}}), + (alloydb_v1.Backup(), {"name": TEST_BACKUP_NAME}), + ({}, {"name": TEST_BACKUP_NAME}), + ], + ) + @mock.patch(HOOK_PATH.format("deepcopy")) + @mock.patch(HOOK_PATH.format("alloydb_v1.Backup.to_dict")) + @mock.patch(HOOK_PATH.format("AlloyDbHook.get_alloy_db_admin_client")) + def test_update_backup(self, mock_client, mock_to_dict, mock_deepcopy, given_backup, expected_backup): + mock_update_backup = mock_client.return_value.update_backup + expected_result = mock_update_backup.return_value + mock_deepcopy.return_value = expected_backup + mock_to_dict.return_value = expected_backup + mock_backup_path = mock_client.return_value.backup_path + mock_backup_path.return_value = expected_backup + + expected_request = { + "update_mask": TEST_UPDATE_MASK, + "backup": expected_backup, + "request_id": TEST_REQUEST_ID, + "validate_only": TEST_VALIDATE_ONLY, + "allow_missing": TEST_ALLOW_MISSING, + } + + result = self.hook.update_backup( + backup_id=TEST_BACKUP_ID, + backup=given_backup, + location=TEST_GCP_REGION, + update_mask=TEST_UPDATE_MASK, + project_id=TEST_GCP_PROJECT, + allow_missing=TEST_ALLOW_MISSING, + request_id=TEST_REQUEST_ID, + validate_only=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + + assert result == expected_result + mock_backup_path.assert_called_once_with(TEST_GCP_PROJECT, TEST_GCP_REGION, TEST_BACKUP_ID) + if isinstance(given_backup, dict): + mock_deepcopy.assert_called_once_with(given_backup) + assert not mock_to_dict.called + else: + assert not mock_deepcopy.called + mock_to_dict.assert_called_once_with(given_backup) + mock_client.assert_called_once() + mock_update_backup.assert_called_once_with( + request=expected_request, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + + @mock.patch(HOOK_PATH.format("AlloyDbHook.get_alloy_db_admin_client")) + def test_delete_backup(self, mock_client): + mock_delete_backup = mock_client.return_value.delete_backup + expected_result = mock_delete_backup.return_value + mock_backup_path = mock_client.return_value.backup_path + mock_backup_path.return_value = TEST_BACKUP_NAME + expected_request = { + "name": TEST_BACKUP_NAME, + "request_id": TEST_REQUEST_ID, + "validate_only": TEST_VALIDATE_ONLY, + } + + result = self.hook.delete_backup( + backup_id=TEST_BACKUP_ID, + location=TEST_GCP_REGION, + project_id=TEST_GCP_PROJECT, + request_id=TEST_REQUEST_ID, + validate_only=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + + assert result == expected_result + mock_client.assert_called_once() + mock_backup_path.assert_called_once_with(TEST_GCP_PROJECT, TEST_GCP_REGION, TEST_BACKUP_ID) + mock_delete_backup.assert_called_once_with( + request=expected_request, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) diff --git a/providers/tests/google/cloud/links/test_alloy_db.py b/providers/tests/google/cloud/links/test_alloy_db.py index 26eebc99a4c88..be4c6d447163a 100644 --- a/providers/tests/google/cloud/links/test_alloy_db.py +++ b/providers/tests/google/cloud/links/test_alloy_db.py @@ -19,7 +19,10 @@ from unittest import mock -from airflow.providers.google.cloud.links.alloy_db import AlloyDBClusterLink +from airflow.providers.google.cloud.links.alloy_db import ( + AlloyDBClusterLink, + AlloyDBUsersLink, +) TEST_LOCATION = "test-location" TEST_CLUSTER_ID = "test-cluster-id" @@ -29,6 +32,11 @@ EXPECTED_ALLOY_DB_CLUSTER_LINK_FORMAT_STR = ( "/alloydb/locations/{location_id}/clusters/{cluster_id}?project={project_id}" ) +EXPECTED_ALLOY_DB_USERS_LINK_NAME = "AlloyDB Users" +EXPECTED_ALLOY_DB_USERS_LINK_KEY = "alloy_db_users" +EXPECTED_ALLOY_DB_USERS_LINK_FORMAT_STR = ( + "/alloydb/locations/{location_id}/clusters/{cluster_id}/users?project={project_id}" +) class TestAlloyDBClusterLink: @@ -57,3 +65,31 @@ def test_persist(self): "project_id": TEST_PROJECT_ID, }, ) + + +class TestAlloyDBUsersLink: + def test_class_attributes(self): + assert AlloyDBUsersLink.key == EXPECTED_ALLOY_DB_USERS_LINK_KEY + assert AlloyDBUsersLink.name == EXPECTED_ALLOY_DB_USERS_LINK_NAME + assert AlloyDBUsersLink.format_str == EXPECTED_ALLOY_DB_USERS_LINK_FORMAT_STR + + def test_persist(self): + mock_context, mock_task_instance = mock.MagicMock(), mock.MagicMock() + + AlloyDBUsersLink.persist( + context=mock_context, + task_instance=mock_task_instance, + location_id=TEST_LOCATION, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_PROJECT_ID, + ) + + mock_task_instance.xcom_push.assert_called_once_with( + mock_context, + key=EXPECTED_ALLOY_DB_USERS_LINK_KEY, + value={ + "location_id": TEST_LOCATION, + "cluster_id": TEST_CLUSTER_ID, + "project_id": TEST_PROJECT_ID, + }, + ) diff --git a/providers/tests/google/cloud/operators/test_alloy_db.py b/providers/tests/google/cloud/operators/test_alloy_db.py index c2ed4a3ae8352..9cc9e5e04ace7 100644 --- a/providers/tests/google/cloud/operators/test_alloy_db.py +++ b/providers/tests/google/cloud/operators/test_alloy_db.py @@ -28,12 +28,18 @@ from airflow.exceptions import AirflowException from airflow.providers.google.cloud.operators.alloy_db import ( AlloyDBBaseOperator, + AlloyDBCreateBackupOperator, AlloyDBCreateClusterOperator, AlloyDBCreateInstanceOperator, + AlloyDBCreateUserOperator, + AlloyDBDeleteBackupOperator, AlloyDBDeleteClusterOperator, AlloyDBDeleteInstanceOperator, + AlloyDBDeleteUserOperator, + AlloyDBUpdateBackupOperator, AlloyDBUpdateClusterOperator, AlloyDBUpdateInstanceOperator, + AlloyDBUpdateUserOperator, AlloyDBWriteBaseOperator, ) @@ -61,6 +67,12 @@ TEST_INSTANCE_ID = "test_instance_id" TEST_INSTANCE: dict[str, Any] = {} +TEST_USER_ID = "test_user" +TEST_USER: dict[str, Any] = {} + +TEST_BACKUP_ID = "test_backup_id" +TEST_BACKUP: dict[str, Any] = {} + OPERATOR_MODULE_PATH = "airflow.providers.google.cloud.operators.alloy_db.{}" ALLOY_DB_HOOK_PATH = OPERATOR_MODULE_PATH.format("AlloyDbHook") BASE_WRITE_CLUSTER_OPERATOR_PATH = OPERATOR_MODULE_PATH.format("AlloyDBWriteBaseOperator.{}") @@ -72,6 +84,14 @@ UPDATE_INSTANCE_OPERATOR_PATH = OPERATOR_MODULE_PATH.format("AlloyDBUpdateInstanceOperator.{}") DELETE_INSTANCE_OPERATOR_PATH = OPERATOR_MODULE_PATH.format("AlloyDBDeleteInstanceOperator.{}") +CREATE_USER_OPERATOR_PATH = OPERATOR_MODULE_PATH.format("AlloyDBCreateUserOperator.{}") +UPDATE_USER_OPERATOR_PATH = OPERATOR_MODULE_PATH.format("AlloyDBUpdateUserOperator.{}") +DELETE_USER_OPERATOR_PATH = OPERATOR_MODULE_PATH.format("AlloyDBDeleteUserOperator.{}") + +CREATE_BACKUP_OPERATOR_PATH = OPERATOR_MODULE_PATH.format("AlloyDBCreateBackupOperator.{}") +UPDATE_BACKUP_OPERATOR_PATH = OPERATOR_MODULE_PATH.format("AlloyDBUpdateBackupOperator.{}") +DELETE_BACKUP_OPERATOR_PATH = OPERATOR_MODULE_PATH.format("AlloyDBDeleteBackupOperator.{}") + class TestAlloyDBBaseOperator: def setup_method(self): @@ -1439,3 +1459,996 @@ def test_execute_exception(self, mock_hook, mock_log, mock_get_operation_result) ) assert not mock_get_operation_result.called mock_log.info.assert_called_once_with("Deleting an AlloyDB instance.") + + +class TestAlloyDBCreateUserOperator: + def setup_method(self): + self.operator = AlloyDBCreateUserOperator( + task_id=TEST_TASK_ID, + user_id=TEST_USER_ID, + user_configuration=TEST_USER, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_GCP_PROJECT, + location=TEST_GCP_REGION, + gcp_conn_id=TEST_GCP_CONN_ID, + request_id=TEST_REQUEST_ID, + validate_request=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + impersonation_chain=TEST_IMPERSONATION_CHAIN, + ) + + def test_init(self): + assert self.operator.user_id == TEST_USER_ID + assert self.operator.user_configuration == TEST_USER + assert self.operator.cluster_id == TEST_CLUSTER_ID + + def test_template_fields(self): + expected_template_fields = { + "cluster_id", + "user_id", + "user_configuration", + } | set(AlloyDBWriteBaseOperator.template_fields) + assert set(AlloyDBCreateUserOperator.template_fields) == expected_template_fields + + @mock.patch(CREATE_USER_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_get_user_not_found(self, mock_hook, mock_log): + mock_get_user = mock_hook.return_value.get_user + mock_get_user.side_effect = NotFound("Not found") + + result = self.operator._get_user() + + mock_get_user.assert_called_once_with( + cluster_id=TEST_CLUSTER_ID, + user_id=TEST_USER_ID, + location=TEST_GCP_REGION, + project_id=TEST_GCP_PROJECT, + ) + mock_log.info.assert_has_calls( + [ + call("Checking if the user %s exists already...", TEST_USER_ID), + call("The user %s does not exist yet.", TEST_USER_ID), + ] + ) + assert result is None + + @mock.patch(CREATE_USER_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_get_user_exception(self, mock_hook, mock_log): + mock_get_user = mock_hook.return_value.get_user + mock_get_user.side_effect = Exception("Test exception") + + with pytest.raises(AirflowException): + self.operator._get_user() + + mock_get_user.assert_called_once_with( + cluster_id=TEST_CLUSTER_ID, + user_id=TEST_USER_ID, + location=TEST_GCP_REGION, + project_id=TEST_GCP_PROJECT, + ) + mock_log.info.assert_called_once_with("Checking if the user %s exists already...", TEST_USER_ID) + + @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.User.to_dict")) + @mock.patch(CREATE_USER_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_get_user(self, mock_hook, mock_log, mock_to_dict): + mock_get_user = mock_hook.return_value.get_user + mock_user = mock_get_user.return_value + expected_result = mock_to_dict.return_value + + result = self.operator._get_user() + + mock_get_user.assert_called_once_with( + user_id=TEST_USER_ID, + cluster_id=TEST_CLUSTER_ID, + location=TEST_GCP_REGION, + project_id=TEST_GCP_PROJECT, + ) + mock_log.info.assert_has_calls( + [ + call("Checking if the user %s exists already...", TEST_USER_ID), + call("AlloyDB user %s already exists in the cluster %s.", TEST_USER_ID, TEST_CLUSTER_ID), + ] + ) + mock_to_dict.assert_called_once_with(mock_user) + assert result == expected_result + + @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBUsersLink")) + @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.User.to_dict")) + @mock.patch(CREATE_USER_OPERATOR_PATH.format("_get_user")) + @mock.patch(CREATE_USER_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute(self, mock_hook, mock_log, mock_get_user, mock_to_dict, mock_link): + mock_get_user.return_value = None + mock_create_user = mock_hook.return_value.create_user + mock_user = mock_create_user.return_value + + expected_result = mock_to_dict.return_value + mock_context = mock.MagicMock() + + result = self.operator.execute(context=mock_context) + + mock_link.persist.assert_called_once_with( + context=mock_context, + task_instance=self.operator, + location_id=TEST_GCP_REGION, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_GCP_PROJECT, + ) + mock_log.info.assert_has_calls( + [ + call("Creating an AlloyDB user."), + call("AlloyDB user %s was successfully created.", TEST_USER_ID), + ] + ) + mock_get_user.assert_called_once() + mock_create_user.assert_called_once_with( + user_id=TEST_USER_ID, + cluster_id=TEST_CLUSTER_ID, + user=TEST_USER, + location=TEST_GCP_REGION, + project_id=TEST_GCP_PROJECT, + request_id=TEST_REQUEST_ID, + validate_only=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + mock_to_dict.assert_called_once_with(mock_user) + assert result == expected_result + + @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBUsersLink")) + @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.User.to_dict")) + @mock.patch(CREATE_USER_OPERATOR_PATH.format("_get_user")) + @mock.patch(CREATE_USER_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute_validate_request(self, mock_hook, mock_log, mock_get_user, mock_to_dict, mock_link): + mock_get_user.return_value = None + mock_create_user = mock_hook.return_value.create_user + + mock_context = mock.MagicMock() + self.operator.validate_request = True + + result = self.operator.execute(context=mock_context) + + mock_link.persist.assert_called_once_with( + context=mock_context, + task_instance=self.operator, + location_id=TEST_GCP_REGION, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_GCP_PROJECT, + ) + + mock_log.info.assert_called_once_with("Validating a Create AlloyDB user request.") + mock_get_user.assert_called_once() + mock_create_user.assert_called_once_with( + user_id=TEST_USER_ID, + cluster_id=TEST_CLUSTER_ID, + user=TEST_USER, + location=TEST_GCP_REGION, + project_id=TEST_GCP_PROJECT, + request_id=TEST_REQUEST_ID, + validate_only=True, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + assert not mock_to_dict.called + assert result is None + + @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBUsersLink")) + @mock.patch(CREATE_USER_OPERATOR_PATH.format("_get_user")) + @mock.patch(CREATE_USER_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute_already_exists(self, mock_hook, mock_log, mock_get_user, mock_link): + expected_result = mock_get_user.return_value + mock_create_user = mock_hook.return_value.create_user + mock_context = mock.MagicMock() + + result = self.operator.execute(context=mock_context) + + mock_link.persist.assert_called_once_with( + context=mock_context, + task_instance=self.operator, + location_id=TEST_GCP_REGION, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_GCP_PROJECT, + ) + + assert not mock_log.info.called + mock_get_user.assert_called_once() + assert not mock_create_user.called + assert result == expected_result + + @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBUsersLink")) + @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.User.to_dict")) + @mock.patch(CREATE_USER_OPERATOR_PATH.format("_get_user")) + @mock.patch(CREATE_USER_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute_exception(self, mock_hook, mock_log, mock_get_user, mock_to_dict, mock_link): + mock_get_user.return_value = None + mock_create_user = mock_hook.return_value.create_user + mock_create_user.side_effect = Exception() + mock_context = mock.MagicMock() + + with pytest.raises(AirflowException): + self.operator.execute(context=mock_context) + + mock_link.persist.assert_called_once_with( + context=mock_context, + task_instance=self.operator, + location_id=TEST_GCP_REGION, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_GCP_PROJECT, + ) + + mock_log.info.assert_called_once_with("Creating an AlloyDB user.") + mock_get_user.assert_called_once() + mock_create_user.assert_called_once_with( + user_id=TEST_USER_ID, + cluster_id=TEST_CLUSTER_ID, + user=TEST_INSTANCE, + location=TEST_GCP_REGION, + project_id=TEST_GCP_PROJECT, + request_id=TEST_REQUEST_ID, + validate_only=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + assert not mock_to_dict.called + + +class TestAlloyDBUpdateUserOperator: + def setup_method(self): + self.operator = AlloyDBUpdateUserOperator( + task_id=TEST_TASK_ID, + user_id=TEST_USER_ID, + cluster_id=TEST_CLUSTER_ID, + user_configuration=TEST_USER, + update_mask=TEST_UPDATE_MASK, + allow_missing=TEST_ALLOW_MISSING, + project_id=TEST_GCP_PROJECT, + location=TEST_GCP_REGION, + gcp_conn_id=TEST_GCP_CONN_ID, + request_id=TEST_REQUEST_ID, + validate_request=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + impersonation_chain=TEST_IMPERSONATION_CHAIN, + ) + + def test_init(self): + assert self.operator.user_id == TEST_USER_ID + assert self.operator.cluster_id == TEST_CLUSTER_ID + assert self.operator.user_configuration == TEST_USER + assert self.operator.update_mask == TEST_UPDATE_MASK + assert self.operator.allow_missing == TEST_ALLOW_MISSING + + def test_template_fields(self): + expected_template_fields = { + "cluster_id", + "user_id", + "user_configuration", + "update_mask", + "allow_missing", + } | set(AlloyDBWriteBaseOperator.template_fields) + assert set(AlloyDBUpdateUserOperator.template_fields) == expected_template_fields + + @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBUsersLink")) + @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.User.to_dict")) + @mock.patch(UPDATE_USER_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute(self, mock_hook, mock_log, mock_to_dict, mock_link): + mock_update_user = mock_hook.return_value.update_user + mock_user = mock_update_user.return_value + expected_result = mock_to_dict.return_value + mock_context = mock.MagicMock() + + result = self.operator.execute(context=mock_context) + + mock_link.persist.assert_called_once_with( + context=mock_context, + task_instance=self.operator, + location_id=TEST_GCP_REGION, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_GCP_PROJECT, + ) + mock_update_user.assert_called_once_with( + cluster_id=TEST_CLUSTER_ID, + user_id=TEST_USER_ID, + project_id=TEST_GCP_PROJECT, + location=TEST_GCP_REGION, + user=TEST_USER, + update_mask=TEST_UPDATE_MASK, + allow_missing=TEST_ALLOW_MISSING, + request_id=TEST_REQUEST_ID, + validate_only=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + mock_to_dict.assert_called_once_with(mock_user) + assert result == expected_result + mock_log.info.assert_has_calls( + [ + call("Updating an AlloyDB user."), + call("AlloyDB user %s was successfully updated.", TEST_USER_ID), + ] + ) + + @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBUsersLink")) + @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.User.to_dict")) + @mock.patch(UPDATE_USER_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute_validate_request(self, mock_hook, mock_log, mock_to_dict, mock_link): + mock_update_user = mock_hook.return_value.update_user + + expected_message = "Validating an Update AlloyDB user request." + mock_context = mock.MagicMock() + self.operator.validate_request = True + + result = self.operator.execute(context=mock_context) + + mock_log.info.assert_called_once_with(expected_message) + mock_update_user.assert_called_once_with( + cluster_id=TEST_CLUSTER_ID, + user_id=TEST_USER_ID, + project_id=TEST_GCP_PROJECT, + location=TEST_GCP_REGION, + user=TEST_USER, + update_mask=TEST_UPDATE_MASK, + allow_missing=TEST_ALLOW_MISSING, + request_id=TEST_REQUEST_ID, + validate_only=True, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + assert not mock_to_dict.called + mock_link.persist.assert_called_once_with( + context=mock_context, + task_instance=self.operator, + location_id=TEST_GCP_REGION, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_GCP_PROJECT, + ) + assert result is None + + @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBUsersLink")) + @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.User.to_dict")) + @mock.patch(UPDATE_USER_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute_exception(self, mock_hook, mock_log, mock_to_dict, mock_link): + mock_update_user = mock_hook.return_value.update_user + mock_update_user.side_effect = Exception + + mock_context = mock.MagicMock() + + with pytest.raises(AirflowException): + self.operator.execute(context=mock_context) + + mock_link.persist.assert_called_once_with( + context=mock_context, + task_instance=self.operator, + location_id=TEST_GCP_REGION, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_GCP_PROJECT, + ) + mock_update_user.assert_called_once_with( + cluster_id=TEST_CLUSTER_ID, + user_id=TEST_USER_ID, + project_id=TEST_GCP_PROJECT, + location=TEST_GCP_REGION, + user=TEST_USER, + update_mask=TEST_UPDATE_MASK, + allow_missing=TEST_ALLOW_MISSING, + request_id=TEST_REQUEST_ID, + validate_only=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + assert not mock_to_dict.called + mock_log.info.assert_called_once_with("Updating an AlloyDB user.") + + +class TestAlloyDBDeleteUserOperator: + def setup_method(self): + self.operator = AlloyDBDeleteUserOperator( + task_id=TEST_TASK_ID, + user_id=TEST_USER_ID, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_GCP_PROJECT, + location=TEST_GCP_REGION, + gcp_conn_id=TEST_GCP_CONN_ID, + request_id=TEST_REQUEST_ID, + validate_request=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + impersonation_chain=TEST_IMPERSONATION_CHAIN, + ) + + def test_init(self): + assert self.operator.cluster_id == TEST_CLUSTER_ID + assert self.operator.user_id == TEST_USER_ID + + def test_template_fields(self): + expected_template_fields = {"user_id", "cluster_id"} | set(AlloyDBWriteBaseOperator.template_fields) + assert set(AlloyDBDeleteUserOperator.template_fields) == expected_template_fields + + @mock.patch(DELETE_USER_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute(self, mock_hook, mock_log): + mock_delete_user = mock_hook.return_value.delete_user + mock_context = mock.MagicMock() + + result = self.operator.execute(context=mock_context) + + mock_delete_user.assert_called_once_with( + user_id=TEST_USER_ID, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_GCP_PROJECT, + location=TEST_GCP_REGION, + request_id=TEST_REQUEST_ID, + validate_only=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + assert result is None + mock_log.info.assert_has_calls( + [ + call("Deleting an AlloyDB user."), + call("AlloyDB user %s was successfully removed.", TEST_USER_ID), + ] + ) + + @mock.patch(DELETE_USER_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute_validate_request(self, mock_hook, mock_log): + mock_delete_user = mock_hook.return_value.delete_user + mock_context = mock.MagicMock() + self.operator.validate_request = True + + result = self.operator.execute(context=mock_context) + + mock_delete_user.assert_called_once_with( + user_id=TEST_USER_ID, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_GCP_PROJECT, + location=TEST_GCP_REGION, + request_id=TEST_REQUEST_ID, + validate_only=True, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + assert result is None + mock_log.info.assert_called_once_with("Validating a Delete AlloyDB user request.") + + @mock.patch(DELETE_USER_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute_exception(self, mock_hook, mock_log): + mock_delete_user = mock_hook.return_value.delete_user + mock_delete_user.side_effect = Exception + mock_context = mock.MagicMock() + + with pytest.raises(AirflowException): + self.operator.execute(context=mock_context) + + mock_delete_user.assert_called_once_with( + user_id=TEST_USER_ID, + cluster_id=TEST_CLUSTER_ID, + project_id=TEST_GCP_PROJECT, + location=TEST_GCP_REGION, + request_id=TEST_REQUEST_ID, + validate_only=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + mock_log.info.assert_called_once_with("Deleting an AlloyDB user.") + + +class TestAlloyDBCreateBackupOperator: + def setup_method(self): + self.operator = AlloyDBCreateBackupOperator( + task_id=TEST_TASK_ID, + backup_id=TEST_BACKUP_ID, + backup_configuration=TEST_BACKUP, + project_id=TEST_GCP_PROJECT, + location=TEST_GCP_REGION, + gcp_conn_id=TEST_GCP_CONN_ID, + request_id=TEST_REQUEST_ID, + validate_request=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + impersonation_chain=TEST_IMPERSONATION_CHAIN, + ) + + def test_init(self): + assert self.operator.backup_id == TEST_BACKUP_ID + assert self.operator.backup_configuration == TEST_BACKUP + + def test_template_fields(self): + expected_template_fields = { + "backup_id", + "backup_configuration", + } | set(AlloyDBWriteBaseOperator.template_fields) + assert set(AlloyDBCreateBackupOperator.template_fields) == expected_template_fields + + @mock.patch(CREATE_BACKUP_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_get_backup_not_found(self, mock_hook, mock_log): + mock_get_backup = mock_hook.return_value.get_backup + mock_get_backup.side_effect = NotFound("Not found") + + result = self.operator._get_backup() + + mock_get_backup.assert_called_once_with( + backup_id=TEST_BACKUP_ID, + location=TEST_GCP_REGION, + project_id=TEST_GCP_PROJECT, + ) + mock_log.info.assert_has_calls( + [ + call("Checking if the backup %s exists already...", TEST_BACKUP_ID), + call("The backup %s does not exist yet.", TEST_BACKUP_ID), + ] + ) + assert result is None + + @mock.patch(CREATE_BACKUP_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_get_backup_exception(self, mock_hook, mock_log): + mock_get_backup = mock_hook.return_value.get_backup + mock_get_backup.side_effect = Exception("Test exception") + + with pytest.raises(AirflowException): + self.operator._get_backup() + + mock_get_backup.assert_called_once_with( + backup_id=TEST_BACKUP_ID, + location=TEST_GCP_REGION, + project_id=TEST_GCP_PROJECT, + ) + mock_log.info.assert_called_once_with("Checking if the backup %s exists already...", TEST_BACKUP_ID) + + @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.Backup.to_dict")) + @mock.patch(CREATE_BACKUP_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_get_backup(self, mock_hook, mock_log, mock_to_dict): + mock_get_backup = mock_hook.return_value.get_backup + mock_instance = mock_get_backup.return_value + expected_result = mock_to_dict.return_value + + result = self.operator._get_backup() + + mock_get_backup.assert_called_once_with( + backup_id=TEST_BACKUP_ID, + location=TEST_GCP_REGION, + project_id=TEST_GCP_PROJECT, + ) + mock_log.info.assert_has_calls( + [ + call("Checking if the backup %s exists already...", TEST_BACKUP_ID), + call("AlloyDB backup %s already exists.", TEST_BACKUP_ID), + ] + ) + mock_to_dict.assert_called_once_with(mock_instance) + assert result == expected_result + + @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBBackupsLink")) + @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.Backup.to_dict")) + @mock.patch(CREATE_BACKUP_OPERATOR_PATH.format("_get_backup")) + @mock.patch(CREATE_BACKUP_OPERATOR_PATH.format("get_operation_result")) + @mock.patch(CREATE_BACKUP_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute( + self, mock_hook, mock_log, mock_get_operation_result, mock_get_backup, mock_to_dict, mock_link + ): + mock_get_backup.return_value = None + mock_create_backup = mock_hook.return_value.create_backup + mock_operation = mock_create_backup.return_value + mock_operation_result = mock_get_operation_result.return_value + + expected_result = mock_to_dict.return_value + mock_context = mock.MagicMock() + + result = self.operator.execute(context=mock_context) + + mock_link.persist.assert_called_once_with( + context=mock_context, + task_instance=self.operator, + project_id=TEST_GCP_PROJECT, + ) + + mock_log.info.assert_called_once_with("Creating an AlloyDB backup.") + mock_get_backup.assert_called_once() + mock_create_backup.assert_called_once_with( + backup_id=TEST_BACKUP_ID, + backup=TEST_BACKUP, + location=TEST_GCP_REGION, + project_id=TEST_GCP_PROJECT, + request_id=TEST_REQUEST_ID, + validate_only=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + mock_to_dict.assert_called_once_with(mock_operation_result) + mock_get_operation_result.assert_called_once_with(mock_operation) + + assert result == expected_result + + @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBBackupsLink")) + @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.Backup.to_dict")) + @mock.patch(CREATE_BACKUP_OPERATOR_PATH.format("_get_backup")) + @mock.patch(CREATE_BACKUP_OPERATOR_PATH.format("get_operation_result")) + @mock.patch(CREATE_BACKUP_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute_validate_request( + self, mock_hook, mock_log, mock_get_operation_result, mock_get_backup, mock_to_dict, mock_link + ): + mock_get_backup.return_value = None + mock_create_backup = mock_hook.return_value.create_backup + mock_operation = mock_create_backup.return_value + mock_get_operation_result.return_value = None + + mock_context = mock.MagicMock() + self.operator.validate_request = True + + result = self.operator.execute(context=mock_context) + + mock_link.persist.assert_called_once_with( + context=mock_context, + task_instance=self.operator, + project_id=TEST_GCP_PROJECT, + ) + + mock_log.info.assert_called_once_with("Validating a Create AlloyDB backup request.") + mock_get_backup.assert_called_once() + mock_create_backup.assert_called_once_with( + backup_id=TEST_BACKUP_ID, + backup=TEST_BACKUP, + location=TEST_GCP_REGION, + project_id=TEST_GCP_PROJECT, + request_id=TEST_REQUEST_ID, + validate_only=True, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + assert not mock_to_dict.called + mock_get_operation_result.assert_called_once_with(mock_operation) + assert result is None + + @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBBackupsLink")) + @mock.patch(CREATE_BACKUP_OPERATOR_PATH.format("_get_backup")) + @mock.patch(CREATE_BACKUP_OPERATOR_PATH.format("get_operation_result")) + @mock.patch(CREATE_BACKUP_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute_already_exists( + self, mock_hook, mock_log, mock_get_operation_result, mock_get_backup, mock_link + ): + expected_result = mock_get_backup.return_value + mock_create_instance = mock_hook.return_value.create_instance + + mock_context = mock.MagicMock() + + result = self.operator.execute(context=mock_context) + + mock_link.persist.assert_called_once_with( + context=mock_context, + task_instance=self.operator, + project_id=TEST_GCP_PROJECT, + ) + + assert not mock_log.info.called + mock_get_backup.assert_called_once() + assert not mock_create_instance.called + assert not mock_get_operation_result.called + assert result == expected_result + + @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBBackupsLink")) + @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.Backup.to_dict")) + @mock.patch(CREATE_BACKUP_OPERATOR_PATH.format("_get_backup")) + @mock.patch(CREATE_BACKUP_OPERATOR_PATH.format("get_operation_result")) + @mock.patch(CREATE_BACKUP_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute_exception( + self, mock_hook, mock_log, mock_get_operation_result, mock_get_backup, mock_to_dict, mock_link + ): + mock_get_backup.return_value = None + mock_create_backup = mock_hook.return_value.create_backup + mock_create_backup.side_effect = Exception() + mock_context = mock.MagicMock() + + with pytest.raises(AirflowException): + self.operator.execute(context=mock_context) + + mock_link.persist.assert_called_once_with( + context=mock_context, + task_instance=self.operator, + project_id=TEST_GCP_PROJECT, + ) + + mock_log.info.assert_called_once_with("Creating an AlloyDB backup.") + mock_get_backup.assert_called_once() + mock_create_backup.assert_called_once_with( + backup_id=TEST_BACKUP_ID, + backup=TEST_BACKUP, + location=TEST_GCP_REGION, + project_id=TEST_GCP_PROJECT, + request_id=TEST_REQUEST_ID, + validate_only=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + assert not mock_to_dict.called + assert not mock_get_operation_result.called + + +class TestAlloyDBUpdateBackupOperator: + def setup_method(self): + self.operator = AlloyDBUpdateBackupOperator( + task_id=TEST_TASK_ID, + backup_id=TEST_BACKUP_ID, + backup_configuration=TEST_BACKUP, + update_mask=TEST_UPDATE_MASK, + allow_missing=TEST_ALLOW_MISSING, + project_id=TEST_GCP_PROJECT, + location=TEST_GCP_REGION, + gcp_conn_id=TEST_GCP_CONN_ID, + request_id=TEST_REQUEST_ID, + validate_request=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + impersonation_chain=TEST_IMPERSONATION_CHAIN, + ) + + def test_init(self): + assert self.operator.backup_id == TEST_BACKUP_ID + assert self.operator.backup_configuration == TEST_BACKUP + assert self.operator.update_mask == TEST_UPDATE_MASK + assert self.operator.allow_missing == TEST_ALLOW_MISSING + + def test_template_fields(self): + expected_template_fields = { + "backup_id", + "backup_configuration", + "update_mask", + "allow_missing", + } | set(AlloyDBWriteBaseOperator.template_fields) + assert set(AlloyDBUpdateBackupOperator.template_fields) == expected_template_fields + + @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBBackupsLink")) + @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.Backup.to_dict")) + @mock.patch(UPDATE_BACKUP_OPERATOR_PATH.format("get_operation_result")) + @mock.patch(UPDATE_BACKUP_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute(self, mock_hook, mock_log, mock_get_operation_result, mock_to_dict, mock_link): + mock_update_backup = mock_hook.return_value.update_backup + mock_operation = mock_update_backup.return_value + mock_operation_result = mock_get_operation_result.return_value + + expected_result = mock_to_dict.return_value + mock_context = mock.MagicMock() + + result = self.operator.execute(context=mock_context) + + mock_link.persist.assert_called_once_with( + context=mock_context, + task_instance=self.operator, + project_id=TEST_GCP_PROJECT, + ) + mock_update_backup.assert_called_once_with( + backup_id=TEST_BACKUP_ID, + project_id=TEST_GCP_PROJECT, + location=TEST_GCP_REGION, + backup=TEST_BACKUP, + update_mask=TEST_UPDATE_MASK, + allow_missing=TEST_ALLOW_MISSING, + request_id=TEST_REQUEST_ID, + validate_only=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + mock_get_operation_result.assert_called_once_with(mock_operation) + mock_to_dict.assert_called_once_with(mock_operation_result) + assert result == expected_result + mock_log.info.assert_has_calls( + [ + call("Updating an AlloyDB backup."), + call("AlloyDB backup %s was successfully updated.", TEST_BACKUP_ID), + ] + ) + + @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBBackupsLink")) + @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.Backup.to_dict")) + @mock.patch(UPDATE_BACKUP_OPERATOR_PATH.format("get_operation_result")) + @mock.patch(UPDATE_BACKUP_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute_validate_request( + self, mock_hook, mock_log, mock_get_operation_result, mock_to_dict, mock_link + ): + mock_update_ackup = mock_hook.return_value.update_backup + mock_operation = mock_update_ackup.return_value + mock_get_operation_result.return_value = None + + expected_message = "Validating an Update AlloyDB backup request." + mock_context = mock.MagicMock() + self.operator.validate_request = True + + result = self.operator.execute(context=mock_context) + + mock_log.info.assert_called_once_with(expected_message) + mock_update_ackup.assert_called_once_with( + backup_id=TEST_BACKUP_ID, + backup=TEST_BACKUP, + project_id=TEST_GCP_PROJECT, + location=TEST_GCP_REGION, + update_mask=TEST_UPDATE_MASK, + allow_missing=TEST_ALLOW_MISSING, + request_id=TEST_REQUEST_ID, + validate_only=True, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + mock_get_operation_result.assert_called_once_with(mock_operation) + assert not mock_to_dict.called + mock_link.persist.assert_called_once_with( + context=mock_context, + task_instance=self.operator, + project_id=TEST_GCP_PROJECT, + ) + assert result is None + + @mock.patch(OPERATOR_MODULE_PATH.format("AlloyDBBackupsLink")) + @mock.patch(OPERATOR_MODULE_PATH.format("alloydb_v1.Backup.to_dict")) + @mock.patch(UPDATE_BACKUP_OPERATOR_PATH.format("get_operation_result")) + @mock.patch(UPDATE_BACKUP_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute_exception(self, mock_hook, mock_log, mock_get_operation_result, mock_to_dict, mock_link): + mock_update_backup = mock_hook.return_value.update_backup + mock_update_backup.side_effect = Exception + + mock_context = mock.MagicMock() + + with pytest.raises(AirflowException): + self.operator.execute(context=mock_context) + + mock_link.persist.assert_called_once_with( + context=mock_context, + task_instance=self.operator, + project_id=TEST_GCP_PROJECT, + ) + mock_update_backup.assert_called_once_with( + backup_id=TEST_BACKUP_ID, + backup=TEST_BACKUP, + project_id=TEST_GCP_PROJECT, + location=TEST_GCP_REGION, + update_mask=TEST_UPDATE_MASK, + allow_missing=TEST_ALLOW_MISSING, + request_id=TEST_REQUEST_ID, + validate_only=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + assert not mock_get_operation_result.called + assert not mock_to_dict.called + mock_log.info.assert_called_once_with("Updating an AlloyDB backup.") + + +class TestAlloyDBDeleteBackupOperator: + def setup_method(self): + self.operator = AlloyDBDeleteBackupOperator( + task_id=TEST_TASK_ID, + backup_id=TEST_BACKUP_ID, + project_id=TEST_GCP_PROJECT, + location=TEST_GCP_REGION, + gcp_conn_id=TEST_GCP_CONN_ID, + request_id=TEST_REQUEST_ID, + validate_request=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + impersonation_chain=TEST_IMPERSONATION_CHAIN, + ) + + def test_init(self): + assert self.operator.backup_id == TEST_BACKUP_ID + + def test_template_fields(self): + expected_template_fields = {"backup_id"} | set(AlloyDBWriteBaseOperator.template_fields) + assert set(AlloyDBDeleteBackupOperator.template_fields) == expected_template_fields + + @mock.patch(DELETE_BACKUP_OPERATOR_PATH.format("get_operation_result")) + @mock.patch(DELETE_BACKUP_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute(self, mock_hook, mock_log, mock_get_operation_result): + mock_delete_backup = mock_hook.return_value.delete_backup + mock_operation = mock_delete_backup.return_value + mock_context = mock.MagicMock() + + result = self.operator.execute(context=mock_context) + + mock_delete_backup.assert_called_once_with( + backup_id=TEST_BACKUP_ID, + project_id=TEST_GCP_PROJECT, + location=TEST_GCP_REGION, + request_id=TEST_REQUEST_ID, + validate_only=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + mock_get_operation_result.assert_called_once_with(mock_operation) + assert result is None + mock_log.info.assert_has_calls( + [ + call("Deleting an AlloyDB backup."), + call("AlloyDB backup %s was successfully removed.", TEST_BACKUP_ID), + ] + ) + + @mock.patch(DELETE_BACKUP_OPERATOR_PATH.format("get_operation_result")) + @mock.patch(DELETE_BACKUP_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute_validate_request(self, mock_hook, mock_log, mock_get_operation_result): + mock_delete_backup = mock_hook.return_value.delete_backup + mock_operation = mock_delete_backup.return_value + mock_context = mock.MagicMock() + self.operator.validate_request = True + + result = self.operator.execute(context=mock_context) + + mock_delete_backup.assert_called_once_with( + backup_id=TEST_BACKUP_ID, + project_id=TEST_GCP_PROJECT, + location=TEST_GCP_REGION, + request_id=TEST_REQUEST_ID, + validate_only=True, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + mock_get_operation_result.assert_called_once_with(mock_operation) + assert result is None + mock_log.info.assert_called_once_with("Validating a Delete AlloyDB backup request.") + + @mock.patch(DELETE_BACKUP_OPERATOR_PATH.format("get_operation_result")) + @mock.patch(DELETE_BACKUP_OPERATOR_PATH.format("log")) + @mock.patch(ALLOY_DB_HOOK_PATH, new_callable=mock.PropertyMock) + def test_execute_exception(self, mock_hook, mock_log, mock_get_operation_result): + mock_delete_backup = mock_hook.return_value.delete_backup + mock_delete_backup.side_effect = Exception + mock_context = mock.MagicMock() + + with pytest.raises(AirflowException): + _ = self.operator.execute(context=mock_context) + + mock_delete_backup.assert_called_once_with( + backup_id=TEST_BACKUP_ID, + project_id=TEST_GCP_PROJECT, + location=TEST_GCP_REGION, + request_id=TEST_REQUEST_ID, + validate_only=TEST_VALIDATE_ONLY, + retry=TEST_RETRY, + timeout=TEST_TIMEOUT, + metadata=TEST_METADATA, + ) + assert not mock_get_operation_result.called + mock_log.info.assert_called_once_with("Deleting an AlloyDB backup.") diff --git a/providers/tests/system/google/cloud/alloy_db/example_alloy_db.py b/providers/tests/system/google/cloud/alloy_db/example_alloy_db.py index c9158d1e110bc..166b1a78c8e3c 100644 --- a/providers/tests/system/google/cloud/alloy_db/example_alloy_db.py +++ b/providers/tests/system/google/cloud/alloy_db/example_alloy_db.py @@ -25,12 +25,18 @@ from airflow.models.dag import DAG from airflow.providers.google.cloud.operators.alloy_db import ( + AlloyDBCreateBackupOperator, AlloyDBCreateClusterOperator, AlloyDBCreateInstanceOperator, + AlloyDBCreateUserOperator, + AlloyDBDeleteBackupOperator, AlloyDBDeleteClusterOperator, AlloyDBDeleteInstanceOperator, + AlloyDBDeleteUserOperator, + AlloyDBUpdateBackupOperator, AlloyDBUpdateClusterOperator, AlloyDBUpdateInstanceOperator, + AlloyDBUpdateUserOperator, ) from airflow.utils.trigger_rule import TriggerRule @@ -40,6 +46,7 @@ GCP_LOCATION = "europe-north1" GCP_LOCATION_SECONDARY = "europe-west1" +GCP_LOCATION_BACKUP = "europe-west2" GCP_NETWORK = "default" CLUSTER_USER = "postgres-test" CLUSTER_USER_PASSWORD = "postgres-test-pa$$w0rd" @@ -74,6 +81,28 @@ "instance_type": "SECONDARY", } SECONDARY_INSTANCE_ID = f"instance-secondary-{DAG_ID}-{ENV_ID}".replace("_", "-") +USER_ID = "test-user" +USER = { + "password": "Test-Pa$$w0rd", + "user_type": "ALLOYDB_BUILT_IN", +} +USER_UPDATE = { + "database_roles": [ + "alloydbsuperuser", + ] +} +USER_UPDATE_MASK = { + "paths": ["database_roles"], +} +BACKUP_ID = "test-backup" +BACKUP = { + "cluster_name": f"projects/{GCP_PROJECT_ID}/locations/{GCP_LOCATION}/clusters/{CLUSTER_ID}", + "type": "ON_DEMAND", +} +BACKUP_UPDATE = {"labels": {"label_test": "test_value"}} +BACKUP_UPDATE_MASK = { + "paths": ["labels"], +} with DAG( DAG_ID, @@ -116,6 +145,27 @@ ) # [END howto_operator_alloy_db_create_instance] + # [START howto_operator_alloy_db_create_backup] + create_backup = AlloyDBCreateBackupOperator( + task_id="create_backup", + backup_id=BACKUP_ID, + backup_configuration=BACKUP, + location=GCP_LOCATION_BACKUP, + project_id=GCP_PROJECT_ID, + ) + # [END howto_operator_alloy_db_create_backup] + + # [START howto_operator_alloy_db_update_backup] + update_backup = AlloyDBUpdateBackupOperator( + task_id="update_backup", + backup_id=BACKUP_ID, + backup_configuration=BACKUP_UPDATE, + update_mask=BACKUP_UPDATE_MASK, + location=GCP_LOCATION_BACKUP, + project_id=GCP_PROJECT_ID, + ) + # [END howto_operator_alloy_db_update_backup] + # [START howto_operator_alloy_db_update_instance] update_instance = AlloyDBUpdateInstanceOperator( task_id="update_instance", @@ -147,6 +197,49 @@ location=GCP_LOCATION_SECONDARY, ) + # [START howto_operator_alloy_db_create_user] + creat_user = AlloyDBCreateUserOperator( + task_id="create_user", + user_id=USER_ID, + user_configuration=USER, + cluster_id=CLUSTER_ID, + project_id=GCP_PROJECT_ID, + location=GCP_LOCATION, + ) + # [END howto_operator_alloy_db_create_user] + + # [START howto_operator_alloy_db_update_user] + update_user = AlloyDBUpdateUserOperator( + task_id="update_user", + user_id=USER_ID, + user_configuration=USER_UPDATE, + cluster_id=CLUSTER_ID, + update_mask=USER_UPDATE_MASK, + location=GCP_LOCATION, + project_id=GCP_PROJECT_ID, + ) + # [END howto_operator_alloy_db_update_user] + + # [START howto_operator_alloy_db_delete_user] + delete_user = AlloyDBDeleteUserOperator( + task_id="delete_user", + cluster_id=CLUSTER_ID, + user_id=USER_ID, + project_id=GCP_PROJECT_ID, + location=GCP_LOCATION, + ) + # [END howto_operator_alloy_db_delete_user] + + # [START howto_operator_alloy_db_delete_backup] + delete_backup = AlloyDBDeleteBackupOperator( + task_id="delete_backup", + backup_id=BACKUP_ID, + location=GCP_LOCATION_BACKUP, + project_id=GCP_PROJECT_ID, + ) + # [END howto_operator_alloy_db_delete_backup] + delete_backup.trigger_rule = TriggerRule.ALL_DONE + # [START howto_operator_alloy_db_delete_instance] delete_instance = AlloyDBDeleteInstanceOperator( task_id="delete_instance", @@ -182,8 +275,14 @@ >> update_cluster >> create_instance >> update_instance + >> create_backup + >> update_backup >> create_secondary_cluster >> create_secondary_instance + >> creat_user + >> update_user + >> delete_user + >> delete_backup >> delete_secondary_cluster >> delete_instance >> delete_cluster