Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add new Dataplex Catalog Entry Group operators #45751

Merged
merged 1 commit into from
Jan 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions docs/apache-airflow-providers-google/operators/cloud/dataplex.rst
Original file line number Diff line number Diff line change
Expand Up @@ -417,3 +417,97 @@ To get a Data Profile scan job you can use:
:dedent: 4
:start-after: [START howto_dataplex_get_data_profile_job_operator]
:end-before: [END howto_dataplex_get_data_profile_job_operator]


Google Dataplex Catalog Operators
=================================

Dataplex Catalog provides a unified inventory of Google Cloud resources, such as BigQuery, and other resources,
such as on-premises resources. Dataplex Catalog automatically retrieves metadata for Google Cloud resources,
and you bring metadata for third-party resources into Dataplex Catalog.

For more information about Dataplex Catalog visit `Dataplex Catalog production documentation <Product documentation <https://cloud.google.com/dataplex/docs/catalog-overview>`__

.. _howto/operator:DataplexCatalogCreateEntryGroupOperator:

Create an EntryGroup
--------------------

To create an Entry Group in specific location in Dataplex Catalog you can
use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogCreateEntryGroupOperator`
For more information about the available fields to pass when creating an Entry Group, visit `Entry Group resource configuration. <https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.entryGroups#EntryGroup>`__

A simple Entry Group configuration can look as followed:

.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
:language: python
:dedent: 0
:start-after: [START howto_dataplex_entry_group_configuration]
:end-before: [END howto_dataplex_entry_group_configuration]

With this configuration you can create an Entry Group resource:

:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogCreateEntryGroupOperator`

.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dataplex_catalog_create_entry_group]
:end-before: [END howto_operator_dataplex_catalog_create_entry_group]

.. _howto/operator:DataplexCatalogDeleteEntryGroupOperator:

Delete an EntryGroup
--------------------

To delete an Entry Group in specific location in Dataplex Catalog you can
use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogDeleteEntryGroupOperator`

.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dataplex_catalog_delete_entry_group]
:end-before: [END howto_operator_dataplex_catalog_delete_entry_group]

.. _howto/operator:DataplexCatalogListEntryGroupsOperator:

List EntryGroups
----------------

To list all Entry Groups in specific location in Dataplex Catalog you can
use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogListEntryGroupsOperator`.
This operator also supports filtering and ordering the result of the operation.

.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dataplex_catalog_list_entry_groups]
:end-before: [END howto_operator_dataplex_catalog_list_entry_groups]

.. _howto/operator:DataplexCatalogGetEntryGroupOperator:

Get an EntryGroup
-----------------

To retrieve an Entry Group in specific location in Dataplex Catalog you can
use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogGetEntryGroupOperator`

.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dataplex_catalog_get_entry_group]
:end-before: [END howto_operator_dataplex_catalog_get_entry_group]

.. _howto/operator:DataplexCatalogUpdateEntryGroupOperator:

Update an EntryGroup
--------------------

To update an Entry Group in specific location in Dataplex Catalog you can
use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogUpdateEntryGroupOperator`

.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
:language: python
:dedent: 4
:start-after: [START howto_operator_dataplex_catalog_update_entry_group]
:end-before: [END howto_operator_dataplex_catalog_update_entry_group]
2 changes: 2 additions & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -574,6 +574,8 @@ encodable
encryptor
enqueue
enqueued
EntryGroup
EntryGroups
entrypoint
entrypoints
Enum
Expand Down
2 changes: 1 addition & 1 deletion generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -656,7 +656,7 @@
"google-cloud-datacatalog>=3.23.0",
"google-cloud-dataflow-client>=0.8.6",
"google-cloud-dataform>=0.5.0",
"google-cloud-dataplex>=1.10.0",
"google-cloud-dataplex>=2.6.0",
"google-cloud-dataproc-metastore>=1.12.0",
"google-cloud-dataproc>=5.12.0",
"google-cloud-dlp>=3.12.0",
Expand Down
212 changes: 211 additions & 1 deletion providers/src/airflow/providers/google/cloud/hooks/dataplex.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,22 @@

import time
from collections.abc import Sequence
from copy import deepcopy
from typing import TYPE_CHECKING, Any

from google.api_core.client_options import ClientOptions
from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
from google.cloud.dataplex_v1 import DataplexServiceClient, DataScanServiceAsyncClient, DataScanServiceClient
from google.cloud.dataplex_v1 import (
DataplexServiceClient,
DataScanServiceAsyncClient,
DataScanServiceClient,
)
from google.cloud.dataplex_v1.services.catalog_service import CatalogServiceClient
from google.cloud.dataplex_v1.types import (
Asset,
DataScan,
DataScanJob,
EntryGroup,
Lake,
Task,
Zone,
Expand All @@ -47,6 +54,7 @@
from google.api_core.operation import Operation
from google.api_core.retry import Retry
from google.api_core.retry_async import AsyncRetry
from google.cloud.dataplex_v1.services.catalog_service.pagers import ListEntryGroupsPager
from googleapiclient.discovery import Resource

PATH_DATA_SCAN = "projects/{project_id}/locations/{region}/dataScans/{data_scan_id}"
Expand Down Expand Up @@ -110,6 +118,14 @@ def get_dataplex_data_scan_client(self) -> DataScanServiceClient:
credentials=self.get_credentials(), client_info=CLIENT_INFO, client_options=client_options
)

def get_dataplex_catalog_client(self) -> CatalogServiceClient:
"""Return CatalogServiceClient."""
client_options = ClientOptions(api_endpoint="dataplex.googleapis.com:443")

return CatalogServiceClient(
credentials=self.get_credentials(), client_info=CLIENT_INFO, client_options=client_options
)

def wait_for_operation(self, timeout: float | None, operation: Operation):
"""Wait for long-lasting operation to complete."""
try:
Expand All @@ -118,6 +134,200 @@ def wait_for_operation(self, timeout: float | None, operation: Operation):
error = operation.exception(timeout=timeout)
raise AirflowException(error)

@GoogleBaseHook.fallback_to_default_project_id
def create_entry_group(
self,
location: str,
entry_group_id: str,
entry_group_configuration: EntryGroup | dict,
project_id: str = PROVIDE_PROJECT_ID,
validate_only: bool = False,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Operation:
"""
Create an Entry resource.

:param location: Required. The ID of the Google Cloud location that the task belongs to.
:param entry_group_id: Required. EntryGroup identifier.
:param entry_group_configuration: Required. EntryGroup configuration body.
:param project_id: Optional. The ID of the Google Cloud project that the task belongs to.
:param validate_only: Optional. If set, performs request validation, but does not actually execute
the create request.
:param retry: Optional. A retry object used to retry requests. If `None` is specified, requests
will not be retried.
:param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional. Additional metadata that is provided to the method.
"""
client = self.get_dataplex_catalog_client()
return client.create_entry_group(
request={
"parent": client.common_location_path(project_id, location),
"entry_group_id": entry_group_id,
"entry_group": entry_group_configuration,
"validate_only": validate_only,
},
retry=retry,
timeout=timeout,
metadata=metadata,
)

@GoogleBaseHook.fallback_to_default_project_id
def get_entry_group(
self,
location: str,
entry_group_id: str,
project_id: str = PROVIDE_PROJECT_ID,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> EntryGroup:
"""
Get an EntryGroup resource.

:param location: Required. The ID of the Google Cloud location that the task belongs to.
:param entry_group_id: Required. EntryGroup identifier.
:param project_id: Optional. The ID of the Google Cloud project that the task belongs to.
:param retry: Optional. A retry object used to retry requests. If `None` is specified, requests
will not be retried.
:param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional. Additional metadata that is provided to the method.
"""
client = self.get_dataplex_catalog_client()
return client.get_entry_group(
request={
"name": client.entry_group_path(project_id, location, entry_group_id),
},
retry=retry,
timeout=timeout,
metadata=metadata,
)

@GoogleBaseHook.fallback_to_default_project_id
def delete_entry_group(
self,
location: str,
entry_group_id: str,
project_id: str = PROVIDE_PROJECT_ID,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Operation:
"""
Delete an EntryGroup resource.

:param location: Required. The ID of the Google Cloud location that the task belongs to.
:param entry_group_id: Required. EntryGroup identifier.
:param project_id: Optional. The ID of the Google Cloud project that the task belongs to.
:param retry: Optional. A retry object used to retry requests. If `None` is specified, requests
will not be retried.
:param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional. Additional metadata that is provided to the method.
"""
client = self.get_dataplex_catalog_client()
return client.delete_entry_group(
request={
"name": client.entry_group_path(project_id, location, entry_group_id),
},
retry=retry,
timeout=timeout,
metadata=metadata,
)

@GoogleBaseHook.fallback_to_default_project_id
def list_entry_groups(
self,
location: str,
filter_by: str | None = None,
order_by: str | None = None,
page_size: int | None = None,
page_token: str | None = None,
project_id: str = PROVIDE_PROJECT_ID,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> ListEntryGroupsPager:
"""
List EntryGroups resources from specific location.

:param location: Required. The ID of the Google Cloud location that the task belongs to.
:param filter_by: Optional. Filter to apply on the list results.
:param order_by: Optional. Fields to order the results by.
:param page_size: Optional. Maximum number of EntryGroups to return on one page.
:param page_token: Optional. Token to retrieve the next page of results.
:param project_id: Optional. The ID of the Google Cloud project that the task belongs to.
:param retry: Optional. A retry object used to retry requests. If `None` is specified, requests
will not be retried.
:param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional. Additional metadata that is provided to the method.
"""
client = self.get_dataplex_catalog_client()
return client.list_entry_groups(
request={
"parent": client.common_location_path(project_id, location),
"filter": filter_by,
"order_by": order_by,
"page_size": page_size,
"page_token": page_token,
},
retry=retry,
timeout=timeout,
metadata=metadata,
)

@GoogleBaseHook.fallback_to_default_project_id
def update_entry_group(
self,
location: str,
entry_group_id: str,
entry_group_configuration: dict | EntryGroup,
project_id: str = PROVIDE_PROJECT_ID,
update_mask: list[str] | FieldMask | None = None,
validate_only: bool | None = False,
retry: Retry | _MethodDefault = DEFAULT,
timeout: float | None = None,
metadata: Sequence[tuple[str, str]] = (),
) -> Operation:
"""
Update an EntryGroup resource.

:param entry_group_id: Required. ID of the EntryGroup to update.
:param entry_group_configuration: Required. The updated configuration body of the EntryGroup.
:param location: Required. The ID of the Google Cloud location that the task belongs to.
:param update_mask: Optional. Names of fields whose values to overwrite on an entry group.
If this parameter is absent or empty, all modifiable fields are overwritten. If such
fields are non-required and omitted in the request body, their values are emptied.
:param project_id: Optional. The ID of the Google Cloud project that the task belongs to.
:param validate_only: Optional. The service validates the request without performing any mutations.
:param retry: Optional. A retry object used to retry requests. If `None` is specified, requests
will not be retried.
:param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
Note that if `retry` is specified, the timeout applies to each individual attempt.
:param metadata: Optional. Additional metadata that is provided to the method.
"""
client = self.get_dataplex_catalog_client()
_entry_group = (
deepcopy(entry_group_configuration)
if isinstance(entry_group_configuration, dict)
else EntryGroup.to_dict(entry_group_configuration)
)
_entry_group["name"] = client.entry_group_path(project_id, location, entry_group_id)
return client.update_entry_group(
request={
"entry_group": _entry_group,
"update_mask": FieldMask(paths=update_mask) if type(update_mask) is list else update_mask,
"validate_only": validate_only,
},
retry=retry,
timeout=timeout,
metadata=metadata,
)

@GoogleBaseHook.fallback_to_default_project_id
def create_task(
self,
Expand Down
Loading