Skip to content

Commit 3b50a06

Browse files
VladaZakharovaUlada Zakharava
and
Ulada Zakharava
authored
Add Dataplex Catalog Entry Group operators (#45751)
Co-authored-by: Ulada Zakharava <[email protected]>
1 parent 375baaf commit 3b50a06

File tree

12 files changed

+1392
-8
lines changed

12 files changed

+1392
-8
lines changed

docs/apache-airflow-providers-google/operators/cloud/dataplex.rst

+94
Original file line numberDiff line numberDiff line change
@@ -417,3 +417,97 @@ To get a Data Profile scan job you can use:
417417
:dedent: 4
418418
:start-after: [START howto_dataplex_get_data_profile_job_operator]
419419
:end-before: [END howto_dataplex_get_data_profile_job_operator]
420+
421+
422+
Google Dataplex Catalog Operators
423+
=================================
424+
425+
Dataplex Catalog provides a unified inventory of Google Cloud resources, such as BigQuery, and other resources,
426+
such as on-premises resources. Dataplex Catalog automatically retrieves metadata for Google Cloud resources,
427+
and you bring metadata for third-party resources into Dataplex Catalog.
428+
429+
For more information about Dataplex Catalog visit `Dataplex Catalog production documentation <Product documentation <https://cloud.google.com/dataplex/docs/catalog-overview>`__
430+
431+
.. _howto/operator:DataplexCatalogCreateEntryGroupOperator:
432+
433+
Create an EntryGroup
434+
--------------------
435+
436+
To create an Entry Group in specific location in Dataplex Catalog you can
437+
use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogCreateEntryGroupOperator`
438+
For more information about the available fields to pass when creating an Entry Group, visit `Entry Group resource configuration. <https://cloud.google.com/dataplex/docs/reference/rest/v1/projects.locations.entryGroups#EntryGroup>`__
439+
440+
A simple Entry Group configuration can look as followed:
441+
442+
.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
443+
:language: python
444+
:dedent: 0
445+
:start-after: [START howto_dataplex_entry_group_configuration]
446+
:end-before: [END howto_dataplex_entry_group_configuration]
447+
448+
With this configuration you can create an Entry Group resource:
449+
450+
:class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogCreateEntryGroupOperator`
451+
452+
.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
453+
:language: python
454+
:dedent: 4
455+
:start-after: [START howto_operator_dataplex_catalog_create_entry_group]
456+
:end-before: [END howto_operator_dataplex_catalog_create_entry_group]
457+
458+
.. _howto/operator:DataplexCatalogDeleteEntryGroupOperator:
459+
460+
Delete an EntryGroup
461+
--------------------
462+
463+
To delete an Entry Group in specific location in Dataplex Catalog you can
464+
use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogDeleteEntryGroupOperator`
465+
466+
.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
467+
:language: python
468+
:dedent: 4
469+
:start-after: [START howto_operator_dataplex_catalog_delete_entry_group]
470+
:end-before: [END howto_operator_dataplex_catalog_delete_entry_group]
471+
472+
.. _howto/operator:DataplexCatalogListEntryGroupsOperator:
473+
474+
List EntryGroups
475+
----------------
476+
477+
To list all Entry Groups in specific location in Dataplex Catalog you can
478+
use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogListEntryGroupsOperator`.
479+
This operator also supports filtering and ordering the result of the operation.
480+
481+
.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
482+
:language: python
483+
:dedent: 4
484+
:start-after: [START howto_operator_dataplex_catalog_list_entry_groups]
485+
:end-before: [END howto_operator_dataplex_catalog_list_entry_groups]
486+
487+
.. _howto/operator:DataplexCatalogGetEntryGroupOperator:
488+
489+
Get an EntryGroup
490+
-----------------
491+
492+
To retrieve an Entry Group in specific location in Dataplex Catalog you can
493+
use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogGetEntryGroupOperator`
494+
495+
.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
496+
:language: python
497+
:dedent: 4
498+
:start-after: [START howto_operator_dataplex_catalog_get_entry_group]
499+
:end-before: [END howto_operator_dataplex_catalog_get_entry_group]
500+
501+
.. _howto/operator:DataplexCatalogUpdateEntryGroupOperator:
502+
503+
Update an EntryGroup
504+
--------------------
505+
506+
To update an Entry Group in specific location in Dataplex Catalog you can
507+
use :class:`~airflow.providers.google.cloud.operators.dataplex.DataplexCatalogUpdateEntryGroupOperator`
508+
509+
.. exampleinclude:: /../../providers/tests/system/google/cloud/dataplex/example_dataplex_catalog.py
510+
:language: python
511+
:dedent: 4
512+
:start-after: [START howto_operator_dataplex_catalog_update_entry_group]
513+
:end-before: [END howto_operator_dataplex_catalog_update_entry_group]

docs/spelling_wordlist.txt

+2
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,8 @@ encodable
574574
encryptor
575575
enqueue
576576
enqueued
577+
EntryGroup
578+
EntryGroups
577579
entrypoint
578580
entrypoints
579581
Enum

generated/provider_dependencies.json

+1-1
Original file line numberDiff line numberDiff line change
@@ -656,7 +656,7 @@
656656
"google-cloud-datacatalog>=3.23.0",
657657
"google-cloud-dataflow-client>=0.8.6",
658658
"google-cloud-dataform>=0.5.0",
659-
"google-cloud-dataplex>=1.10.0",
659+
"google-cloud-dataplex>=2.6.0",
660660
"google-cloud-dataproc-metastore>=1.12.0",
661661
"google-cloud-dataproc>=5.12.0",
662662
"google-cloud-dlp>=3.12.0",

providers/src/airflow/providers/google/cloud/hooks/dataplex.py

+211-1
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,22 @@
2020

2121
import time
2222
from collections.abc import Sequence
23+
from copy import deepcopy
2324
from typing import TYPE_CHECKING, Any
2425

2526
from google.api_core.client_options import ClientOptions
2627
from google.api_core.gapic_v1.method import DEFAULT, _MethodDefault
27-
from google.cloud.dataplex_v1 import DataplexServiceClient, DataScanServiceAsyncClient, DataScanServiceClient
28+
from google.cloud.dataplex_v1 import (
29+
DataplexServiceClient,
30+
DataScanServiceAsyncClient,
31+
DataScanServiceClient,
32+
)
33+
from google.cloud.dataplex_v1.services.catalog_service import CatalogServiceClient
2834
from google.cloud.dataplex_v1.types import (
2935
Asset,
3036
DataScan,
3137
DataScanJob,
38+
EntryGroup,
3239
Lake,
3340
Task,
3441
Zone,
@@ -47,6 +54,7 @@
4754
from google.api_core.operation import Operation
4855
from google.api_core.retry import Retry
4956
from google.api_core.retry_async import AsyncRetry
57+
from google.cloud.dataplex_v1.services.catalog_service.pagers import ListEntryGroupsPager
5058
from googleapiclient.discovery import Resource
5159

5260
PATH_DATA_SCAN = "projects/{project_id}/locations/{region}/dataScans/{data_scan_id}"
@@ -110,6 +118,14 @@ def get_dataplex_data_scan_client(self) -> DataScanServiceClient:
110118
credentials=self.get_credentials(), client_info=CLIENT_INFO, client_options=client_options
111119
)
112120

121+
def get_dataplex_catalog_client(self) -> CatalogServiceClient:
122+
"""Return CatalogServiceClient."""
123+
client_options = ClientOptions(api_endpoint="dataplex.googleapis.com:443")
124+
125+
return CatalogServiceClient(
126+
credentials=self.get_credentials(), client_info=CLIENT_INFO, client_options=client_options
127+
)
128+
113129
def wait_for_operation(self, timeout: float | None, operation: Operation):
114130
"""Wait for long-lasting operation to complete."""
115131
try:
@@ -118,6 +134,200 @@ def wait_for_operation(self, timeout: float | None, operation: Operation):
118134
error = operation.exception(timeout=timeout)
119135
raise AirflowException(error)
120136

137+
@GoogleBaseHook.fallback_to_default_project_id
138+
def create_entry_group(
139+
self,
140+
location: str,
141+
entry_group_id: str,
142+
entry_group_configuration: EntryGroup | dict,
143+
project_id: str = PROVIDE_PROJECT_ID,
144+
validate_only: bool = False,
145+
retry: Retry | _MethodDefault = DEFAULT,
146+
timeout: float | None = None,
147+
metadata: Sequence[tuple[str, str]] = (),
148+
) -> Operation:
149+
"""
150+
Create an Entry resource.
151+
152+
:param location: Required. The ID of the Google Cloud location that the task belongs to.
153+
:param entry_group_id: Required. EntryGroup identifier.
154+
:param entry_group_configuration: Required. EntryGroup configuration body.
155+
:param project_id: Optional. The ID of the Google Cloud project that the task belongs to.
156+
:param validate_only: Optional. If set, performs request validation, but does not actually execute
157+
the create request.
158+
:param retry: Optional. A retry object used to retry requests. If `None` is specified, requests
159+
will not be retried.
160+
:param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
161+
Note that if `retry` is specified, the timeout applies to each individual attempt.
162+
:param metadata: Optional. Additional metadata that is provided to the method.
163+
"""
164+
client = self.get_dataplex_catalog_client()
165+
return client.create_entry_group(
166+
request={
167+
"parent": client.common_location_path(project_id, location),
168+
"entry_group_id": entry_group_id,
169+
"entry_group": entry_group_configuration,
170+
"validate_only": validate_only,
171+
},
172+
retry=retry,
173+
timeout=timeout,
174+
metadata=metadata,
175+
)
176+
177+
@GoogleBaseHook.fallback_to_default_project_id
178+
def get_entry_group(
179+
self,
180+
location: str,
181+
entry_group_id: str,
182+
project_id: str = PROVIDE_PROJECT_ID,
183+
retry: Retry | _MethodDefault = DEFAULT,
184+
timeout: float | None = None,
185+
metadata: Sequence[tuple[str, str]] = (),
186+
) -> EntryGroup:
187+
"""
188+
Get an EntryGroup resource.
189+
190+
:param location: Required. The ID of the Google Cloud location that the task belongs to.
191+
:param entry_group_id: Required. EntryGroup identifier.
192+
:param project_id: Optional. The ID of the Google Cloud project that the task belongs to.
193+
:param retry: Optional. A retry object used to retry requests. If `None` is specified, requests
194+
will not be retried.
195+
:param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
196+
Note that if `retry` is specified, the timeout applies to each individual attempt.
197+
:param metadata: Optional. Additional metadata that is provided to the method.
198+
"""
199+
client = self.get_dataplex_catalog_client()
200+
return client.get_entry_group(
201+
request={
202+
"name": client.entry_group_path(project_id, location, entry_group_id),
203+
},
204+
retry=retry,
205+
timeout=timeout,
206+
metadata=metadata,
207+
)
208+
209+
@GoogleBaseHook.fallback_to_default_project_id
210+
def delete_entry_group(
211+
self,
212+
location: str,
213+
entry_group_id: str,
214+
project_id: str = PROVIDE_PROJECT_ID,
215+
retry: Retry | _MethodDefault = DEFAULT,
216+
timeout: float | None = None,
217+
metadata: Sequence[tuple[str, str]] = (),
218+
) -> Operation:
219+
"""
220+
Delete an EntryGroup resource.
221+
222+
:param location: Required. The ID of the Google Cloud location that the task belongs to.
223+
:param entry_group_id: Required. EntryGroup identifier.
224+
:param project_id: Optional. The ID of the Google Cloud project that the task belongs to.
225+
:param retry: Optional. A retry object used to retry requests. If `None` is specified, requests
226+
will not be retried.
227+
:param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
228+
Note that if `retry` is specified, the timeout applies to each individual attempt.
229+
:param metadata: Optional. Additional metadata that is provided to the method.
230+
"""
231+
client = self.get_dataplex_catalog_client()
232+
return client.delete_entry_group(
233+
request={
234+
"name": client.entry_group_path(project_id, location, entry_group_id),
235+
},
236+
retry=retry,
237+
timeout=timeout,
238+
metadata=metadata,
239+
)
240+
241+
@GoogleBaseHook.fallback_to_default_project_id
242+
def list_entry_groups(
243+
self,
244+
location: str,
245+
filter_by: str | None = None,
246+
order_by: str | None = None,
247+
page_size: int | None = None,
248+
page_token: str | None = None,
249+
project_id: str = PROVIDE_PROJECT_ID,
250+
retry: Retry | _MethodDefault = DEFAULT,
251+
timeout: float | None = None,
252+
metadata: Sequence[tuple[str, str]] = (),
253+
) -> ListEntryGroupsPager:
254+
"""
255+
List EntryGroups resources from specific location.
256+
257+
:param location: Required. The ID of the Google Cloud location that the task belongs to.
258+
:param filter_by: Optional. Filter to apply on the list results.
259+
:param order_by: Optional. Fields to order the results by.
260+
:param page_size: Optional. Maximum number of EntryGroups to return on one page.
261+
:param page_token: Optional. Token to retrieve the next page of results.
262+
:param project_id: Optional. The ID of the Google Cloud project that the task belongs to.
263+
:param retry: Optional. A retry object used to retry requests. If `None` is specified, requests
264+
will not be retried.
265+
:param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
266+
Note that if `retry` is specified, the timeout applies to each individual attempt.
267+
:param metadata: Optional. Additional metadata that is provided to the method.
268+
"""
269+
client = self.get_dataplex_catalog_client()
270+
return client.list_entry_groups(
271+
request={
272+
"parent": client.common_location_path(project_id, location),
273+
"filter": filter_by,
274+
"order_by": order_by,
275+
"page_size": page_size,
276+
"page_token": page_token,
277+
},
278+
retry=retry,
279+
timeout=timeout,
280+
metadata=metadata,
281+
)
282+
283+
@GoogleBaseHook.fallback_to_default_project_id
284+
def update_entry_group(
285+
self,
286+
location: str,
287+
entry_group_id: str,
288+
entry_group_configuration: dict | EntryGroup,
289+
project_id: str = PROVIDE_PROJECT_ID,
290+
update_mask: list[str] | FieldMask | None = None,
291+
validate_only: bool | None = False,
292+
retry: Retry | _MethodDefault = DEFAULT,
293+
timeout: float | None = None,
294+
metadata: Sequence[tuple[str, str]] = (),
295+
) -> Operation:
296+
"""
297+
Update an EntryGroup resource.
298+
299+
:param entry_group_id: Required. ID of the EntryGroup to update.
300+
:param entry_group_configuration: Required. The updated configuration body of the EntryGroup.
301+
:param location: Required. The ID of the Google Cloud location that the task belongs to.
302+
:param update_mask: Optional. Names of fields whose values to overwrite on an entry group.
303+
If this parameter is absent or empty, all modifiable fields are overwritten. If such
304+
fields are non-required and omitted in the request body, their values are emptied.
305+
:param project_id: Optional. The ID of the Google Cloud project that the task belongs to.
306+
:param validate_only: Optional. The service validates the request without performing any mutations.
307+
:param retry: Optional. A retry object used to retry requests. If `None` is specified, requests
308+
will not be retried.
309+
:param timeout: Optional. The amount of time, in seconds, to wait for the request to complete.
310+
Note that if `retry` is specified, the timeout applies to each individual attempt.
311+
:param metadata: Optional. Additional metadata that is provided to the method.
312+
"""
313+
client = self.get_dataplex_catalog_client()
314+
_entry_group = (
315+
deepcopy(entry_group_configuration)
316+
if isinstance(entry_group_configuration, dict)
317+
else EntryGroup.to_dict(entry_group_configuration)
318+
)
319+
_entry_group["name"] = client.entry_group_path(project_id, location, entry_group_id)
320+
return client.update_entry_group(
321+
request={
322+
"entry_group": _entry_group,
323+
"update_mask": FieldMask(paths=update_mask) if type(update_mask) is list else update_mask,
324+
"validate_only": validate_only,
325+
},
326+
retry=retry,
327+
timeout=timeout,
328+
metadata=metadata,
329+
)
330+
121331
@GoogleBaseHook.fallback_to_default_project_id
122332
def create_task(
123333
self,

0 commit comments

Comments
 (0)