From 1522339caa90e3c8d2640451c37b76615b593677 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20G=C3=B3mez=20Villamor?= Date: Fri, 27 Dec 2024 13:32:39 +0100 Subject: [PATCH 01/10] chore(tableau): set ingestion stage report and pertimers --- .../ingestion/source/tableau/tableau.py | 75 +++++++++++++++---- 1 file changed, 62 insertions(+), 13 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py index df59cae3fad23..cc1a07020e8d1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py @@ -3,7 +3,7 @@ import re import time from collections import OrderedDict -from dataclasses import dataclass +from dataclasses import dataclass, field from datetime import datetime from functools import lru_cache from typing import ( @@ -117,6 +117,7 @@ ) from datahub.ingestion.source.tableau.tableau_server_wrapper import UserInfo from datahub.ingestion.source.tableau.tableau_validation import check_user_role +from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport from datahub.metadata.com.linkedin.pegasus2avro.common import ( AuditStamp, ChangeAuditStamps, @@ -169,6 +170,8 @@ create_lineage_sql_parsed_result, ) from datahub.utilities import config_clean +from datahub.utilities.perf_timer import PerfTimer +from datahub.utilities.stats_collections import TopKDict from datahub.utilities.urns.dataset_urn import DatasetUrn try: @@ -636,12 +639,27 @@ class SiteIdContentUrl: site_content_url: str -class TableauSourceReport(StaleEntityRemovalSourceReport): +class TableauSourceReport( + StaleEntityRemovalSourceReport, + IngestionStageReport, +): get_all_datasources_query_failed: bool = False num_get_datasource_query_failures: int = 0 num_datasource_field_skipped_no_name: int = 0 num_csql_field_skipped_no_name: int = 0 num_table_field_skipped_no_name: int = 0 + # timers + extract_usage_stats_timer: Dict[str, float] = field(default_factory=TopKDict) + fetch_groups_timer: Dict[str, float] = field(default_factory=TopKDict) + populate_database_server_hostname_map_timer: Dict[str, float] = field(default_factory=TopKDict) + populate_projects_registry_timer: Dict[str, float] = field(default_factory=TopKDict) + emit_workbooks_timer: Dict[str, float] = field(default_factory=TopKDict) + emit_sheets_timer: Dict[str, float] = field(default_factory=TopKDict) + emit_dashboards_timer: Dict[str, float] = field(default_factory=TopKDict) + emit_embedded_datasources_timer: Dict[str, float] = field(default_factory=TopKDict) + emit_published_datasources_timer: Dict[str, float] = field(default_factory=TopKDict) + emit_custom_sql_datasources_timer: Dict[str, float] = field(default_factory=TopKDict) + emit_upstream_tables_timer: Dict[str, float] = field(default_factory=TopKDict) # lineage num_tables_with_upstream_lineage: int = 0 num_upstream_table_lineage: int = 0 @@ -3457,33 +3475,64 @@ def _create_workbook_properties( return {"permissions": json.dumps(groups)} if len(groups) > 0 else None def ingest_tableau_site(self): + self.report.report_ingestion_stage_start(f"Ingesting Tableau Site: {self.site_id} {self.site_content_url}") + # Initialise the dictionary to later look-up for chart and dashboard stat if self.config.extract_usage_stats: - self._populate_usage_stat_registry() + with PerfTimer() as timer: + self._populate_usage_stat_registry() + self.report.extract_usage_stats_timer[self.site_id] = round(timer.elapsed_seconds(), 2) if self.config.permission_ingestion: - self._fetch_groups() + with PerfTimer() as timer: + self._fetch_groups() + self.report.fetch_groups_timer[self.site_id] = round(timer.elapsed_seconds(), 2) # Populate the map of database names and database hostnames to be used later to map # databases to platform instances. if self.config.database_hostname_to_platform_instance_map: - self._populate_database_server_hostname_map() + with PerfTimer() as timer: + self._populate_database_server_hostname_map() + self.report.populate_database_server_hostname_map_timer[self.site_id] = round(timer.elapsed_seconds(), 2) - self._populate_projects_registry() + with PerfTimer() as timer: + self._populate_projects_registry() + self.report.populate_projects_registry_timer[self.site_id] = round(timer.elapsed_seconds(), 2) if self.config.add_site_container: yield from self.emit_site_container() yield from self.emit_project_containers() - yield from self.emit_workbooks() + + with PerfTimer() as timer: + yield from self.emit_workbooks() + self.report.emit_workbooks_timer[self.site_id] = round(timer.elapsed_seconds(), 2) + if self.sheet_ids: - yield from self.emit_sheets() + with PerfTimer() as timer: + yield from self.emit_sheets() + self.report.emit_sheets_timer[self.site_id] = round(timer.elapsed_seconds(), 2) + if self.dashboard_ids: - yield from self.emit_dashboards() + with PerfTimer() as timer: + yield from self.emit_dashboards() + self.report.emit_dashboards_timer[self.site_id] = round(timer.elapsed_seconds(), 2) + if self.embedded_datasource_ids_being_used: - yield from self.emit_embedded_datasources() + with PerfTimer() as timer: + yield from self.emit_embedded_datasources() + self.report.emit_embedded_datasources_timer[self.site_id] = round(timer.elapsed_seconds(), 2) + if self.datasource_ids_being_used: - yield from self.emit_published_datasources() + with PerfTimer() as timer: + yield from self.emit_published_datasources() + self.report.emit_published_datasources_timer[self.site_id] = round(timer.elapsed_seconds(), 2) + if self.custom_sql_ids_being_used: - yield from self.emit_custom_sql_datasources() + with PerfTimer() as timer: + yield from self.emit_custom_sql_datasources() + self.report.emit_custom_sql_datasources_timer[self.site_id] = round(timer.elapsed_seconds(), 2) + if self.database_tables: - yield from self.emit_upstream_tables() + with PerfTimer() as timer: + yield from self.emit_upstream_tables() + self.report.emit_upstream_tables_timer[self.site_id] = round(timer.elapsed_seconds(), 2) From 1afb4e7bebba43845cd48e9edc1fd2836246f39b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20G=C3=B3mez=20Villamor?= Date: Fri, 27 Dec 2024 13:58:04 +0100 Subject: [PATCH 02/10] fixup --- .../src/datahub/ingestion/source/tableau/tableau.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py index cc1a07020e8d1..62ee721db9316 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py @@ -639,6 +639,7 @@ class SiteIdContentUrl: site_content_url: str +@dataclass class TableauSourceReport( StaleEntityRemovalSourceReport, IngestionStageReport, @@ -670,7 +671,7 @@ class TableauSourceReport( num_upstream_table_lineage_failed_parse_sql: int = 0 num_upstream_fine_grained_lineage_failed_parse_sql: int = 0 num_hidden_assets_skipped: int = 0 - logged_in_user: List[UserInfo] = [] + logged_in_user: List[UserInfo] = field(default_factory=list) def report_user_role(report: TableauSourceReport, server: Server) -> None: @@ -834,6 +835,9 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: platform=self.platform, ) yield from site_source.ingest_tableau_site() + + self.report.report_ingestion_stage_start("End") + except MetadataQueryException as md_exception: self.report.failure( title="Failed to Retrieve Tableau Metadata", @@ -3535,4 +3539,4 @@ def ingest_tableau_site(self): if self.database_tables: with PerfTimer() as timer: yield from self.emit_upstream_tables() - self.report.emit_upstream_tables_timer[self.site_id] = round(timer.elapsed_seconds(), 2) + self.report.emit_upstream_tables_timer[self.site_id] = round(timer.elapsed_seconds(), 2) \ No newline at end of file From fbc6d8e7d31fb22ddae705758d175a717d2556a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20G=C3=B3mez=20Villamor?= Date: Fri, 27 Dec 2024 15:16:17 +0100 Subject: [PATCH 03/10] fix lint --- .../ingestion/source/tableau/tableau.py | 90 +++++++++++++------ 1 file changed, 64 insertions(+), 26 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py index 62ee721db9316..72c104147a00c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py @@ -3,7 +3,7 @@ import re import time from collections import OrderedDict -from dataclasses import dataclass, field +from dataclasses import dataclass, field as dataclass_field from datetime import datetime from functools import lru_cache from typing import ( @@ -650,17 +650,31 @@ class TableauSourceReport( num_csql_field_skipped_no_name: int = 0 num_table_field_skipped_no_name: int = 0 # timers - extract_usage_stats_timer: Dict[str, float] = field(default_factory=TopKDict) - fetch_groups_timer: Dict[str, float] = field(default_factory=TopKDict) - populate_database_server_hostname_map_timer: Dict[str, float] = field(default_factory=TopKDict) - populate_projects_registry_timer: Dict[str, float] = field(default_factory=TopKDict) - emit_workbooks_timer: Dict[str, float] = field(default_factory=TopKDict) - emit_sheets_timer: Dict[str, float] = field(default_factory=TopKDict) - emit_dashboards_timer: Dict[str, float] = field(default_factory=TopKDict) - emit_embedded_datasources_timer: Dict[str, float] = field(default_factory=TopKDict) - emit_published_datasources_timer: Dict[str, float] = field(default_factory=TopKDict) - emit_custom_sql_datasources_timer: Dict[str, float] = field(default_factory=TopKDict) - emit_upstream_tables_timer: Dict[str, float] = field(default_factory=TopKDict) + extract_usage_stats_timer: Dict[str, float] = dataclass_field( + default_factory=TopKDict + ) + fetch_groups_timer: Dict[str, float] = dataclass_field(default_factory=TopKDict) + populate_database_server_hostname_map_timer: Dict[str, float] = dataclass_field( + default_factory=TopKDict + ) + populate_projects_registry_timer: Dict[str, float] = dataclass_field( + default_factory=TopKDict + ) + emit_workbooks_timer: Dict[str, float] = dataclass_field(default_factory=TopKDict) + emit_sheets_timer: Dict[str, float] = dataclass_field(default_factory=TopKDict) + emit_dashboards_timer: Dict[str, float] = dataclass_field(default_factory=TopKDict) + emit_embedded_datasources_timer: Dict[str, float] = dataclass_field( + default_factory=TopKDict + ) + emit_published_datasources_timer: Dict[str, float] = dataclass_field( + default_factory=TopKDict + ) + emit_custom_sql_datasources_timer: Dict[str, float] = dataclass_field( + default_factory=TopKDict + ) + emit_upstream_tables_timer: Dict[str, float] = dataclass_field( + default_factory=TopKDict + ) # lineage num_tables_with_upstream_lineage: int = 0 num_upstream_table_lineage: int = 0 @@ -671,7 +685,7 @@ class TableauSourceReport( num_upstream_table_lineage_failed_parse_sql: int = 0 num_upstream_fine_grained_lineage_failed_parse_sql: int = 0 num_hidden_assets_skipped: int = 0 - logged_in_user: List[UserInfo] = field(default_factory=list) + logged_in_user: List[UserInfo] = dataclass_field(default_factory=list) def report_user_role(report: TableauSourceReport, server: Server) -> None: @@ -837,7 +851,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: yield from site_source.ingest_tableau_site() self.report.report_ingestion_stage_start("End") - + except MetadataQueryException as md_exception: self.report.failure( title="Failed to Retrieve Tableau Metadata", @@ -3479,29 +3493,39 @@ def _create_workbook_properties( return {"permissions": json.dumps(groups)} if len(groups) > 0 else None def ingest_tableau_site(self): - self.report.report_ingestion_stage_start(f"Ingesting Tableau Site: {self.site_id} {self.site_content_url}") + self.report.report_ingestion_stage_start( + f"Ingesting Tableau Site: {self.site_id} {self.site_content_url}" + ) # Initialise the dictionary to later look-up for chart and dashboard stat if self.config.extract_usage_stats: with PerfTimer() as timer: self._populate_usage_stat_registry() - self.report.extract_usage_stats_timer[self.site_id] = round(timer.elapsed_seconds(), 2) + self.report.extract_usage_stats_timer[self.site_id] = round( + timer.elapsed_seconds(), 2 + ) if self.config.permission_ingestion: with PerfTimer() as timer: self._fetch_groups() - self.report.fetch_groups_timer[self.site_id] = round(timer.elapsed_seconds(), 2) + self.report.fetch_groups_timer[self.site_id] = round( + timer.elapsed_seconds(), 2 + ) # Populate the map of database names and database hostnames to be used later to map # databases to platform instances. if self.config.database_hostname_to_platform_instance_map: with PerfTimer() as timer: self._populate_database_server_hostname_map() - self.report.populate_database_server_hostname_map_timer[self.site_id] = round(timer.elapsed_seconds(), 2) + self.report.populate_database_server_hostname_map_timer[ + self.site_id + ] = round(timer.elapsed_seconds(), 2) with PerfTimer() as timer: self._populate_projects_registry() - self.report.populate_projects_registry_timer[self.site_id] = round(timer.elapsed_seconds(), 2) + self.report.populate_projects_registry_timer[self.site_id] = round( + timer.elapsed_seconds(), 2 + ) if self.config.add_site_container: yield from self.emit_site_container() @@ -3509,34 +3533,48 @@ def ingest_tableau_site(self): with PerfTimer() as timer: yield from self.emit_workbooks() - self.report.emit_workbooks_timer[self.site_id] = round(timer.elapsed_seconds(), 2) + self.report.emit_workbooks_timer[self.site_id] = round( + timer.elapsed_seconds(), 2 + ) if self.sheet_ids: with PerfTimer() as timer: yield from self.emit_sheets() - self.report.emit_sheets_timer[self.site_id] = round(timer.elapsed_seconds(), 2) + self.report.emit_sheets_timer[self.site_id] = round( + timer.elapsed_seconds(), 2 + ) if self.dashboard_ids: with PerfTimer() as timer: yield from self.emit_dashboards() - self.report.emit_dashboards_timer[self.site_id] = round(timer.elapsed_seconds(), 2) + self.report.emit_dashboards_timer[self.site_id] = round( + timer.elapsed_seconds(), 2 + ) if self.embedded_datasource_ids_being_used: with PerfTimer() as timer: yield from self.emit_embedded_datasources() - self.report.emit_embedded_datasources_timer[self.site_id] = round(timer.elapsed_seconds(), 2) + self.report.emit_embedded_datasources_timer[self.site_id] = round( + timer.elapsed_seconds(), 2 + ) if self.datasource_ids_being_used: with PerfTimer() as timer: yield from self.emit_published_datasources() - self.report.emit_published_datasources_timer[self.site_id] = round(timer.elapsed_seconds(), 2) + self.report.emit_published_datasources_timer[self.site_id] = round( + timer.elapsed_seconds(), 2 + ) if self.custom_sql_ids_being_used: with PerfTimer() as timer: yield from self.emit_custom_sql_datasources() - self.report.emit_custom_sql_datasources_timer[self.site_id] = round(timer.elapsed_seconds(), 2) + self.report.emit_custom_sql_datasources_timer[self.site_id] = round( + timer.elapsed_seconds(), 2 + ) if self.database_tables: with PerfTimer() as timer: yield from self.emit_upstream_tables() - self.report.emit_upstream_tables_timer[self.site_id] = round(timer.elapsed_seconds(), 2) \ No newline at end of file + self.report.emit_upstream_tables_timer[self.site_id] = round( + timer.elapsed_seconds(), 2 + ) From 332953fa94f4716fc8be5eadf660f7bdab0a47ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20G=C3=B3mez=20Villamor?= Date: Thu, 2 Jan 2025 10:52:56 +0100 Subject: [PATCH 04/10] automatically close latest ongoing stage if any --- .../src/datahub/ingestion/api/source.py | 2 ++ .../datahub/ingestion/source/tableau/tableau.py | 5 +++-- .../ingestion/source_report/ingestion_stage.py | 17 ++++++++++------- .../src/datahub/utilities/perf_timer.py | 9 ++++++--- 4 files changed, 21 insertions(+), 12 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index c3638635b19aa..d3adfe0069b2a 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -331,6 +331,8 @@ def as_obj(self) -> dict: } def compute_stats(self) -> None: + super().compute_stats() + duration = datetime.datetime.now() - self.start_time workunits_produced = self.events_produced if duration.total_seconds() > 0: diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py index 72c104147a00c..c9ca383a09c57 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py @@ -687,6 +687,9 @@ class TableauSourceReport( num_hidden_assets_skipped: int = 0 logged_in_user: List[UserInfo] = dataclass_field(default_factory=list) + def compute_stats(self) -> None: + self.close_stage() + def report_user_role(report: TableauSourceReport, server: Server) -> None: title: str = "Insufficient Permissions" @@ -850,8 +853,6 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: ) yield from site_source.ingest_tableau_site() - self.report.report_ingestion_stage_start("End") - except MetadataQueryException as md_exception: self.report.failure( title="Failed to Retrieve Tableau Metadata", diff --git a/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py b/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py index ce683e64b3f46..81972b62074c4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py +++ b/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py @@ -25,12 +25,10 @@ class IngestionStageReport: ingestion_stage: Optional[str] = None ingestion_stage_durations: TopKDict[str, float] = field(default_factory=TopKDict) - _timer: Optional[PerfTimer] = field( - default=None, init=False, repr=False, compare=False - ) + _timer: PerfTimer = PerfTimer() - def report_ingestion_stage_start(self, stage: str) -> None: - if self._timer: + def _close_stage(self) -> None: + if self._timer.is_running(): elapsed = round(self._timer.elapsed_seconds(), 2) logger.info( f"Time spent in stage <{self.ingestion_stage}>: {elapsed} seconds", @@ -38,9 +36,14 @@ def report_ingestion_stage_start(self, stage: str) -> None: ) if self.ingestion_stage: self.ingestion_stage_durations[self.ingestion_stage] = elapsed - else: - self._timer = PerfTimer() + + def report_ingestion_stage_start(self, stage: str) -> None: + self._close_stage() self.ingestion_stage = f"{stage} at {datetime.now(timezone.utc)}" logger.info(f"Stage started: {self.ingestion_stage}") self._timer.start() + + def close_stage(self) -> None: + # just close ongoing stage if any + self._close_stage() diff --git a/metadata-ingestion/src/datahub/utilities/perf_timer.py b/metadata-ingestion/src/datahub/utilities/perf_timer.py index 9488683d6d8ca..66cd67062a7a4 100644 --- a/metadata-ingestion/src/datahub/utilities/perf_timer.py +++ b/metadata-ingestion/src/datahub/utilities/perf_timer.py @@ -70,6 +70,11 @@ def elapsed_seconds(self) -> float: return (self.end_time - self.start_time) + self._past_active_time def assert_timer_is_running(self) -> None: + if not self.is_running(): + self._error_state = True + logger.warning("Did you forget to start the timer ?") + + def is_running(self) -> bool: """ Returns true if timer is in running state. Timer is in NOT in running state if @@ -77,9 +82,7 @@ def assert_timer_is_running(self) -> None: 2. it is in paused state. 3. it had been started and finished in the past but not started again. """ - if self.start_time is None or self.paused or self.end_time: - self._error_state = True - logger.warning("Did you forget to start the timer ?") + return self.start_time is not None and not self.paused and self.end_time is None def __repr__(self) -> str: return repr(self.as_obj()) From 0b13bfc397a04a9e024bdfa5672654d062acc5fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20G=C3=B3mez=20Villamor?= Date: Thu, 2 Jan 2025 11:08:58 +0100 Subject: [PATCH 05/10] use site_content_url and round digits parameter to perftimer method --- .../ingestion/source/tableau/tableau.py | 64 +++++++++---------- .../src/datahub/utilities/perf_timer.py | 8 ++- 2 files changed, 37 insertions(+), 35 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py index c9ca383a09c57..5c0378eff77b2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py @@ -3502,16 +3502,16 @@ def ingest_tableau_site(self): if self.config.extract_usage_stats: with PerfTimer() as timer: self._populate_usage_stat_registry() - self.report.extract_usage_stats_timer[self.site_id] = round( - timer.elapsed_seconds(), 2 - ) + self.report.extract_usage_stats_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) if self.config.permission_ingestion: with PerfTimer() as timer: self._fetch_groups() - self.report.fetch_groups_timer[self.site_id] = round( - timer.elapsed_seconds(), 2 - ) + self.report.fetch_groups_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) # Populate the map of database names and database hostnames to be used later to map # databases to platform instances. @@ -3519,14 +3519,14 @@ def ingest_tableau_site(self): with PerfTimer() as timer: self._populate_database_server_hostname_map() self.report.populate_database_server_hostname_map_timer[ - self.site_id - ] = round(timer.elapsed_seconds(), 2) + self.site_content_url + ] = timer.elapsed_seconds(digits=2) with PerfTimer() as timer: self._populate_projects_registry() - self.report.populate_projects_registry_timer[self.site_id] = round( - timer.elapsed_seconds(), 2 - ) + self.report.populate_projects_registry_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) if self.config.add_site_container: yield from self.emit_site_container() @@ -3534,48 +3534,48 @@ def ingest_tableau_site(self): with PerfTimer() as timer: yield from self.emit_workbooks() - self.report.emit_workbooks_timer[self.site_id] = round( - timer.elapsed_seconds(), 2 - ) + self.report.emit_workbooks_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) if self.sheet_ids: with PerfTimer() as timer: yield from self.emit_sheets() - self.report.emit_sheets_timer[self.site_id] = round( - timer.elapsed_seconds(), 2 - ) + self.report.emit_sheets_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) if self.dashboard_ids: with PerfTimer() as timer: yield from self.emit_dashboards() - self.report.emit_dashboards_timer[self.site_id] = round( - timer.elapsed_seconds(), 2 - ) + self.report.emit_dashboards_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) if self.embedded_datasource_ids_being_used: with PerfTimer() as timer: yield from self.emit_embedded_datasources() - self.report.emit_embedded_datasources_timer[self.site_id] = round( - timer.elapsed_seconds(), 2 - ) + self.report.emit_embedded_datasources_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) if self.datasource_ids_being_used: with PerfTimer() as timer: yield from self.emit_published_datasources() - self.report.emit_published_datasources_timer[self.site_id] = round( - timer.elapsed_seconds(), 2 - ) + self.report.emit_published_datasources_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) if self.custom_sql_ids_being_used: with PerfTimer() as timer: yield from self.emit_custom_sql_datasources() - self.report.emit_custom_sql_datasources_timer[self.site_id] = round( - timer.elapsed_seconds(), 2 - ) + self.report.emit_custom_sql_datasources_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) if self.database_tables: with PerfTimer() as timer: yield from self.emit_upstream_tables() - self.report.emit_upstream_tables_timer[self.site_id] = round( - timer.elapsed_seconds(), 2 - ) + self.report.emit_upstream_tables_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) diff --git a/metadata-ingestion/src/datahub/utilities/perf_timer.py b/metadata-ingestion/src/datahub/utilities/perf_timer.py index 66cd67062a7a4..99bd1ca0058c9 100644 --- a/metadata-ingestion/src/datahub/utilities/perf_timer.py +++ b/metadata-ingestion/src/datahub/utilities/perf_timer.py @@ -57,7 +57,7 @@ def __exit__( self.finish() return None - def elapsed_seconds(self) -> float: + def elapsed_seconds(self, digits: Optional[int] = 5) -> float: """ Returns the elapsed time in seconds. """ @@ -65,9 +65,11 @@ def elapsed_seconds(self) -> float: return self._past_active_time if self.end_time is None: - return (time.perf_counter() - self.start_time) + (self._past_active_time) + elapsed = (time.perf_counter() - self.start_time) + (self._past_active_time) else: - return (self.end_time - self.start_time) + self._past_active_time + elapsed = (self.end_time - self.start_time) + self._past_active_time + + return round(elapsed, digits) if digits else elapsed def assert_timer_is_running(self) -> None: if not self.is_running(): From 5d42781e4c7cbed7c23e0bdc66f63666d115d439 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20G=C3=B3mez=20Villamor?= Date: Thu, 2 Jan 2025 15:15:33 +0100 Subject: [PATCH 06/10] fix default --- metadata-ingestion/src/datahub/utilities/perf_timer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata-ingestion/src/datahub/utilities/perf_timer.py b/metadata-ingestion/src/datahub/utilities/perf_timer.py index 99bd1ca0058c9..b91b93b0eb308 100644 --- a/metadata-ingestion/src/datahub/utilities/perf_timer.py +++ b/metadata-ingestion/src/datahub/utilities/perf_timer.py @@ -57,7 +57,7 @@ def __exit__( self.finish() return None - def elapsed_seconds(self, digits: Optional[int] = 5) -> float: + def elapsed_seconds(self, digits: Optional[int] = None) -> float: """ Returns the elapsed time in seconds. """ From 2fe9a1ca5328ebea7ebd38f9c7964a99229a1110 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20G=C3=B3mez=20Villamor?= Date: Fri, 3 Jan 2025 11:02:08 +0100 Subject: [PATCH 07/10] revert updates in IngestionStageReport and implement a new one using ContextManager --- .../ingestion/source/tableau/tableau.py | 154 +++++++++--------- .../source_report/ingestion_stage.py | 46 +++++- .../unit/reporting/test_ingestion_stage.py | 42 +++++ 3 files changed, 154 insertions(+), 88 deletions(-) create mode 100644 metadata-ingestion/tests/unit/reporting/test_ingestion_stage.py diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py index bd52fbdfa1e11..b1c5d1c79f082 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py @@ -117,7 +117,7 @@ ) from datahub.ingestion.source.tableau.tableau_server_wrapper import UserInfo from datahub.ingestion.source.tableau.tableau_validation import check_user_role -from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport +from datahub.ingestion.source_report.ingestion_stage import IngestionStageContextReport from datahub.metadata.com.linkedin.pegasus2avro.common import ( AuditStamp, ChangeAuditStamps, @@ -642,7 +642,7 @@ class SiteIdContentUrl: @dataclass class TableauSourceReport( StaleEntityRemovalSourceReport, - IngestionStageReport, + IngestionStageContextReport, ): get_all_datasources_query_failed: bool = False num_get_datasource_query_failures: int = 0 @@ -687,9 +687,6 @@ class TableauSourceReport( num_hidden_assets_skipped: int = 0 logged_in_user: List[UserInfo] = dataclass_field(default_factory=list) - def compute_stats(self) -> None: - self.close_stage() - def report_user_role(report: TableauSourceReport, server: Server) -> None: title: str = "Insufficient Permissions" @@ -3491,88 +3488,87 @@ def _create_workbook_properties( return {"permissions": json.dumps(groups)} if len(groups) > 0 else None def ingest_tableau_site(self): - self.report.report_ingestion_stage_start( + with self.report.new_stage( f"Ingesting Tableau Site: {self.site_id} {self.site_content_url}" - ) - - # Initialise the dictionary to later look-up for chart and dashboard stat - if self.config.extract_usage_stats: - with PerfTimer() as timer: - self._populate_usage_stat_registry() - self.report.extract_usage_stats_timer[ - self.site_content_url - ] = timer.elapsed_seconds(digits=2) - - if self.config.permission_ingestion: - with PerfTimer() as timer: - self._fetch_groups() - self.report.fetch_groups_timer[ - self.site_content_url - ] = timer.elapsed_seconds(digits=2) - - # Populate the map of database names and database hostnames to be used later to map - # databases to platform instances. - if self.config.database_hostname_to_platform_instance_map: - with PerfTimer() as timer: - self._populate_database_server_hostname_map() - self.report.populate_database_server_hostname_map_timer[ - self.site_content_url - ] = timer.elapsed_seconds(digits=2) - - with PerfTimer() as timer: - self._populate_projects_registry() - self.report.populate_projects_registry_timer[ - self.site_content_url - ] = timer.elapsed_seconds(digits=2) - - if self.config.add_site_container: - yield from self.emit_site_container() - yield from self.emit_project_containers() - - with PerfTimer() as timer: - yield from self.emit_workbooks() - self.report.emit_workbooks_timer[ - self.site_content_url - ] = timer.elapsed_seconds(digits=2) - - if self.sheet_ids: - with PerfTimer() as timer: - yield from self.emit_sheets() - self.report.emit_sheets_timer[ - self.site_content_url - ] = timer.elapsed_seconds(digits=2) - - if self.dashboard_ids: - with PerfTimer() as timer: - yield from self.emit_dashboards() - self.report.emit_dashboards_timer[ - self.site_content_url - ] = timer.elapsed_seconds(digits=2) + ): + # Initialise the dictionary to later look-up for chart and dashboard stat + if self.config.extract_usage_stats: + with PerfTimer() as timer: + self._populate_usage_stat_registry() + self.report.extract_usage_stats_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) + + if self.config.permission_ingestion: + with PerfTimer() as timer: + self._fetch_groups() + self.report.fetch_groups_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) + + # Populate the map of database names and database hostnames to be used later to map + # databases to platform instances. + if self.config.database_hostname_to_platform_instance_map: + with PerfTimer() as timer: + self._populate_database_server_hostname_map() + self.report.populate_database_server_hostname_map_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) - if self.embedded_datasource_ids_being_used: with PerfTimer() as timer: - yield from self.emit_embedded_datasources() - self.report.emit_embedded_datasources_timer[ + self._populate_projects_registry() + self.report.populate_projects_registry_timer[ self.site_content_url ] = timer.elapsed_seconds(digits=2) - if self.datasource_ids_being_used: - with PerfTimer() as timer: - yield from self.emit_published_datasources() - self.report.emit_published_datasources_timer[ - self.site_content_url - ] = timer.elapsed_seconds(digits=2) + if self.config.add_site_container: + yield from self.emit_site_container() + yield from self.emit_project_containers() - if self.custom_sql_ids_being_used: with PerfTimer() as timer: - yield from self.emit_custom_sql_datasources() - self.report.emit_custom_sql_datasources_timer[ + yield from self.emit_workbooks() + self.report.emit_workbooks_timer[ self.site_content_url ] = timer.elapsed_seconds(digits=2) - if self.database_tables: - with PerfTimer() as timer: - yield from self.emit_upstream_tables() - self.report.emit_upstream_tables_timer[ - self.site_content_url - ] = timer.elapsed_seconds(digits=2) + if self.sheet_ids: + with PerfTimer() as timer: + yield from self.emit_sheets() + self.report.emit_sheets_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) + + if self.dashboard_ids: + with PerfTimer() as timer: + yield from self.emit_dashboards() + self.report.emit_dashboards_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) + + if self.embedded_datasource_ids_being_used: + with PerfTimer() as timer: + yield from self.emit_embedded_datasources() + self.report.emit_embedded_datasources_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) + + if self.datasource_ids_being_used: + with PerfTimer() as timer: + yield from self.emit_published_datasources() + self.report.emit_published_datasources_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) + + if self.custom_sql_ids_being_used: + with PerfTimer() as timer: + yield from self.emit_custom_sql_datasources() + self.report.emit_custom_sql_datasources_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) + + if self.database_tables: + with PerfTimer() as timer: + yield from self.emit_upstream_tables() + self.report.emit_upstream_tables_timer[ + self.site_content_url + ] = timer.elapsed_seconds(digits=2) diff --git a/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py b/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py index 81972b62074c4..5d2870d41c686 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py +++ b/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py @@ -1,4 +1,5 @@ import logging +from contextlib import AbstractContextManager from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Optional @@ -25,10 +26,12 @@ class IngestionStageReport: ingestion_stage: Optional[str] = None ingestion_stage_durations: TopKDict[str, float] = field(default_factory=TopKDict) - _timer: PerfTimer = PerfTimer() + _timer: Optional[PerfTimer] = field( + default=None, init=False, repr=False, compare=False + ) - def _close_stage(self) -> None: - if self._timer.is_running(): + def report_ingestion_stage_start(self, stage: str) -> None: + if self._timer: elapsed = round(self._timer.elapsed_seconds(), 2) logger.info( f"Time spent in stage <{self.ingestion_stage}>: {elapsed} seconds", @@ -36,14 +39,39 @@ def _close_stage(self) -> None: ) if self.ingestion_stage: self.ingestion_stage_durations[self.ingestion_stage] = elapsed - - def report_ingestion_stage_start(self, stage: str) -> None: - self._close_stage() + else: + self._timer = PerfTimer() self.ingestion_stage = f"{stage} at {datetime.now(timezone.utc)}" logger.info(f"Stage started: {self.ingestion_stage}") self._timer.start() - def close_stage(self) -> None: - # just close ongoing stage if any - self._close_stage() + +@dataclass +class IngestionStageContextReport: + ingestion_stage_durations: TopKDict[str, float] = field(default_factory=TopKDict) + + def new_stage(self, stage: str) -> "IngestionStageContext": + return IngestionStageContext(stage, self) + + +@dataclass +class IngestionStageContext(AbstractContextManager): + def __init__(self, stage: str, report: IngestionStageContextReport): + self._ingestion_stage = f"{stage} at {datetime.now(timezone.utc)}" + self._timer: PerfTimer = PerfTimer() + self._report = report + + def __enter__(self) -> "IngestionStageContext": + logger.info(f"Stage started: {self._ingestion_stage}") + self._timer.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + elapsed = self._timer.elapsed_seconds(digits=2) + logger.info( + f"Time spent in stage <{self._ingestion_stage}>: {elapsed} seconds", + stacklevel=2, + ) + self._report.ingestion_stage_durations[self._ingestion_stage] = elapsed + return None diff --git a/metadata-ingestion/tests/unit/reporting/test_ingestion_stage.py b/metadata-ingestion/tests/unit/reporting/test_ingestion_stage.py new file mode 100644 index 0000000000000..7c62214323bda --- /dev/null +++ b/metadata-ingestion/tests/unit/reporting/test_ingestion_stage.py @@ -0,0 +1,42 @@ +import time + +from datahub.ingestion.source_report.ingestion_stage import IngestionStageContextReport + + +def test_ingestion_stage_context_records_duration(): + report = IngestionStageContextReport() + with report.new_stage(stage="Test Stage"): + pass + assert len(report._ingestion_stage_durations) == 1 + assert "Test Stage" in next(iter(report._ingestion_stage_durations.keys())) + + +def test_ingestion_stage_context_handles_exceptions(): + report = IngestionStageContextReport() + try: + with report.new_stage(stage="Test Stage"): + raise ValueError("Test Exception") + except ValueError: + pass + assert len(report._ingestion_stage_durations) == 1 + assert "Test Stage" in next(iter(report._ingestion_stage_durations)) + + +def test_ingestion_stage_context_report_handles_multiple_stages(): + report = IngestionStageContextReport() + with report.new_stage(stage="Test Stage 1"): + time.sleep(0.1) + with report.new_stage(stage="Test Stage 2"): + time.sleep(0.1) + with report.new_stage(stage="Test Stage 3"): + time.sleep(0.1) + assert len(report._ingestion_stage_durations) == 3 + assert all( + isinstance(duration, float) and duration > 0.0 + for duration in report._ingestion_stage_durations.values() + ) + + sorted_stages = list(sorted(report._ingestion_stage_durations.keys())) + assert "Test Stage 1" in sorted_stages[0] + assert "Test Stage 2" in sorted_stages[1] + assert "Test Stage 3" in sorted_stages[2] From cc73d57450dfd30826a346e805d169a8b1fc7f9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20G=C3=B3mez=20Villamor?= Date: Fri, 3 Jan 2025 11:15:59 +0100 Subject: [PATCH 08/10] fixup --- .../tests/unit/reporting/test_ingestion_stage.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/metadata-ingestion/tests/unit/reporting/test_ingestion_stage.py b/metadata-ingestion/tests/unit/reporting/test_ingestion_stage.py index 7c62214323bda..4b791a2c83d85 100644 --- a/metadata-ingestion/tests/unit/reporting/test_ingestion_stage.py +++ b/metadata-ingestion/tests/unit/reporting/test_ingestion_stage.py @@ -7,8 +7,8 @@ def test_ingestion_stage_context_records_duration(): report = IngestionStageContextReport() with report.new_stage(stage="Test Stage"): pass - assert len(report._ingestion_stage_durations) == 1 - assert "Test Stage" in next(iter(report._ingestion_stage_durations.keys())) + assert len(report.ingestion_stage_durations) == 1 + assert "Test Stage" in next(iter(report.ingestion_stage_durations.keys())) def test_ingestion_stage_context_handles_exceptions(): @@ -18,8 +18,8 @@ def test_ingestion_stage_context_handles_exceptions(): raise ValueError("Test Exception") except ValueError: pass - assert len(report._ingestion_stage_durations) == 1 - assert "Test Stage" in next(iter(report._ingestion_stage_durations)) + assert len(report.ingestion_stage_durations) == 1 + assert "Test Stage" in next(iter(report.ingestion_stage_durations)) def test_ingestion_stage_context_report_handles_multiple_stages(): @@ -30,13 +30,13 @@ def test_ingestion_stage_context_report_handles_multiple_stages(): time.sleep(0.1) with report.new_stage(stage="Test Stage 3"): time.sleep(0.1) - assert len(report._ingestion_stage_durations) == 3 + assert len(report.ingestion_stage_durations) == 3 assert all( isinstance(duration, float) and duration > 0.0 - for duration in report._ingestion_stage_durations.values() + for duration in report.ingestion_stage_durations.values() ) - sorted_stages = list(sorted(report._ingestion_stage_durations.keys())) + sorted_stages = list(sorted(report.ingestion_stage_durations.keys())) assert "Test Stage 1" in sorted_stages[0] assert "Test Stage 2" in sorted_stages[1] assert "Test Stage 3" in sorted_stages[2] From 986fb1dee6eee51109486d6c391c7054cad0d300 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20G=C3=B3mez=20Villamor?= Date: Fri, 3 Jan 2025 11:31:07 +0100 Subject: [PATCH 09/10] timer elapsed time default digits = 4 --- .../source/bigquery_v2/bigquery_schema_gen.py | 6 +++--- .../ingestion/source/bigquery_v2/lineage.py | 4 ++-- .../ingestion/source/bigquery_v2/usage.py | 4 ++-- .../ingestion/source/redshift/redshift.py | 16 +++++++--------- .../datahub/ingestion/source/redshift/usage.py | 2 +- .../source/snowflake/snowflake_usage_v2.py | 6 +++--- .../ingestion/source_report/ingestion_stage.py | 2 +- .../src/datahub/utilities/perf_timer.py | 4 ++-- .../performance/bigquery/test_bigquery_usage.py | 2 +- .../tests/performance/databricks/test_unity.py | 2 +- .../performance/snowflake/test_snowflake.py | 2 +- .../tests/performance/sql/test_sql_formatter.py | 6 ++++-- 12 files changed, 28 insertions(+), 28 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py index 4a3b47f6b543a..4f8c1b9f7fd27 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py @@ -1209,9 +1209,9 @@ def get_tables_for_dataset( report=self.report, ) - self.report.metadata_extraction_sec[f"{project_id}.{dataset.name}"] = round( - timer.elapsed_seconds(), 2 - ) + self.report.metadata_extraction_sec[ + f"{project_id}.{dataset.name}" + ] = timer.elapsed_seconds(digits=2) def get_core_table_details( self, dataset_name: str, project_id: str, temp_table_dataset_prefix: str diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index 321b1b6207fab..008adc42ca79c 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -381,8 +381,8 @@ def generate_lineage( self.report.lineage_metadata_entries[project_id] = len(lineage) logger.info(f"Built lineage map containing {len(lineage)} entries.") logger.debug(f"lineage metadata is {lineage}") - self.report.lineage_extraction_sec[project_id] = round( - timer.elapsed_seconds(), 2 + self.report.lineage_extraction_sec[project_id] = timer.elapsed_seconds( + digits=2 ) self.report.lineage_mem_size[project_id] = humanfriendly.format_size( memory_footprint.total_size(lineage) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py index 876ffab85ba31..b9ca8deb68d3a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py @@ -572,8 +572,8 @@ def _get_usage_events(self, projects: Iterable[str]) -> Iterable[AuditEvent]: ) self.report_status(f"usage-extraction-{project_id}", False) - self.report.usage_extraction_sec[project_id] = round( - timer.elapsed_seconds(), 2 + self.report.usage_extraction_sec[project_id] = timer.elapsed_seconds( + digits=2 ) def _store_usage_event( diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index 49f7941563c1a..a8ddda201f9c5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -633,8 +633,8 @@ def process_schema( else: logger.info("View processing disabled, skipping") - self.report.metadata_extraction_sec[report_key] = round( - timer.elapsed_seconds(), 2 + self.report.metadata_extraction_sec[report_key] = timer.elapsed_seconds( + digits=2 ) def _process_table( @@ -986,9 +986,7 @@ def extract_usage( yield from usage_extractor.get_usage_workunits(all_tables=all_tables) - self.report.usage_extraction_sec[database] = round( - timer.elapsed_seconds(), 2 - ) + self.report.usage_extraction_sec[database] = timer.elapsed_seconds(digits=2) def extract_lineage( self, @@ -1011,8 +1009,8 @@ def extract_lineage( database=database, connection=connection, all_tables=all_tables ) - self.report.lineage_extraction_sec[f"{database}"] = round( - timer.elapsed_seconds(), 2 + self.report.lineage_extraction_sec[f"{database}"] = timer.elapsed_seconds( + digits=2 ) yield from self.generate_lineage( database, lineage_extractor=lineage_extractor @@ -1042,8 +1040,8 @@ def extract_lineage_v2( yield from lineage_extractor.generate() - self.report.lineage_extraction_sec[f"{database}"] = round( - timer.elapsed_seconds(), 2 + self.report.lineage_extraction_sec[f"{database}"] = timer.elapsed_seconds( + digits=2 ) if self.redundant_lineage_run_skip_handler: diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/usage.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/usage.py index e0bf8b23dd0f7..7c6affd50321f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/usage.py @@ -190,7 +190,7 @@ def _get_workunits_internal( ) self.report.operational_metadata_extraction_sec[ self.config.database - ] = round(timer.elapsed_seconds(), 2) + ] = timer.elapsed_seconds(digits=2) # Generate aggregate events self.report.report_ingestion_stage_start(USAGE_EXTRACTION_USAGE_AGGREGATION) diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py index 4bdf559f293b5..73565ebf30593 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py @@ -386,7 +386,7 @@ def _get_snowflake_history(self) -> Iterable[SnowflakeJoinedAccessEvent]: ) self.report_status(USAGE_EXTRACTION_OPERATIONAL_STATS, False) return - self.report.access_history_query_secs = round(timer.elapsed_seconds(), 2) + self.report.access_history_query_secs = timer.elapsed_seconds(digits=2) for row in results: yield from self._process_snowflake_history_row(row) @@ -434,8 +434,8 @@ def _check_usage_date_ranges(self) -> None: self.report.max_access_history_time = db_row["MAX_TIME"].astimezone( tz=timezone.utc ) - self.report.access_history_range_query_secs = round( - timer.elapsed_seconds(), 2 + self.report.access_history_range_query_secs = timer.elapsed_seconds( + digits=2 ) def _get_operation_aspect_work_unit( diff --git a/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py b/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py index 5d2870d41c686..40959af60ed2b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py +++ b/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py @@ -32,7 +32,7 @@ class IngestionStageReport: def report_ingestion_stage_start(self, stage: str) -> None: if self._timer: - elapsed = round(self._timer.elapsed_seconds(), 2) + elapsed = self._timer.elapsed_seconds(digits=2) logger.info( f"Time spent in stage <{self.ingestion_stage}>: {elapsed} seconds", stacklevel=2, diff --git a/metadata-ingestion/src/datahub/utilities/perf_timer.py b/metadata-ingestion/src/datahub/utilities/perf_timer.py index b91b93b0eb308..fc1b1ed58244c 100644 --- a/metadata-ingestion/src/datahub/utilities/perf_timer.py +++ b/metadata-ingestion/src/datahub/utilities/perf_timer.py @@ -57,7 +57,7 @@ def __exit__( self.finish() return None - def elapsed_seconds(self, digits: Optional[int] = None) -> float: + def elapsed_seconds(self, digits: int = 4) -> float: """ Returns the elapsed time in seconds. """ @@ -69,7 +69,7 @@ def elapsed_seconds(self, digits: Optional[int] = None) -> float: else: elapsed = (self.end_time - self.start_time) + self._past_active_time - return round(elapsed, digits) if digits else elapsed + return round(elapsed, digits) def assert_timer_is_running(self) -> None: if not self.is_running(): diff --git a/metadata-ingestion/tests/performance/bigquery/test_bigquery_usage.py b/metadata-ingestion/tests/performance/bigquery/test_bigquery_usage.py index 9cb80ff02657b..ee5baacf2441f 100644 --- a/metadata-ingestion/tests/performance/bigquery/test_bigquery_usage.py +++ b/metadata-ingestion/tests/performance/bigquery/test_bigquery_usage.py @@ -80,7 +80,7 @@ def run_test(): num_workunits, peak_memory_usage = workunit_sink(workunits) report.set_ingestion_stage("All", "Done") print(f"Workunits Generated: {num_workunits}") - print(f"Seconds Elapsed: {timer.elapsed_seconds():.2f} seconds") + print(f"Seconds Elapsed: {timer.elapsed_seconds(digits=2)} seconds") print( f"Peak Memory Used: {humanfriendly.format_size(peak_memory_usage - pre_mem_usage)}" diff --git a/metadata-ingestion/tests/performance/databricks/test_unity.py b/metadata-ingestion/tests/performance/databricks/test_unity.py index ddd19804ba184..71192dc5b509b 100644 --- a/metadata-ingestion/tests/performance/databricks/test_unity.py +++ b/metadata-ingestion/tests/performance/databricks/test_unity.py @@ -59,7 +59,7 @@ def run_test(): workunits = source.get_workunits() num_workunits, peak_memory_usage = workunit_sink(workunits) print(f"Workunits Generated: {num_workunits}") - print(f"Seconds Elapsed: {timer.elapsed_seconds():.2f} seconds") + print(f"Seconds Elapsed: {timer.elapsed_seconds(digits=2)} seconds") print( f"Peak Memory Used: {humanfriendly.format_size(peak_memory_usage - pre_mem_usage)}" diff --git a/metadata-ingestion/tests/performance/snowflake/test_snowflake.py b/metadata-ingestion/tests/performance/snowflake/test_snowflake.py index 984d9e4295745..a940cce46a8f7 100644 --- a/metadata-ingestion/tests/performance/snowflake/test_snowflake.py +++ b/metadata-ingestion/tests/performance/snowflake/test_snowflake.py @@ -53,7 +53,7 @@ def run_test(): workunits = source.get_workunits() num_workunits, peak_memory_usage = workunit_sink(workunits) logging.info(f"Workunits Generated: {num_workunits}") - logging.info(f"Seconds Elapsed: {timer.elapsed_seconds():.2f} seconds") + logging.info(f"Seconds Elapsed: {timer.elapsed_seconds(digits=2)} seconds") logging.info(source.get_report().as_string()) logging.info( diff --git a/metadata-ingestion/tests/performance/sql/test_sql_formatter.py b/metadata-ingestion/tests/performance/sql/test_sql_formatter.py index 5f783efc559bc..f09047c0ec4a4 100644 --- a/metadata-ingestion/tests/performance/sql/test_sql_formatter.py +++ b/metadata-ingestion/tests/performance/sql/test_sql_formatter.py @@ -12,12 +12,14 @@ def run_test() -> None: for i in range(N): if i % 50 == 0: print( - f"Running iteration {i}, elapsed time: {timer.elapsed_seconds():.2f} seconds" + f"Running iteration {i}, elapsed time: {timer.elapsed_seconds(digits=2)} seconds" ) try_format_query.__wrapped__(large_sql_query, platform="snowflake") - print(f"Total time taken for {N} iterations: {timer.elapsed_seconds():.2f} seconds") + print( + f"Total time taken for {N} iterations: {timer.elapsed_seconds(digits=2)} seconds" + ) if __name__ == "__main__": From e7825e4e9f379169ae7614e25fa0e0185f5eb87f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20G=C3=B3mez=20Villamor?= Date: Tue, 7 Jan 2025 10:18:01 +0100 Subject: [PATCH 10/10] refactor all other usages of the ingestion stage --- .../ingestion/source/bigquery_v2/bigquery.py | 59 ++++---- .../source/bigquery_v2/bigquery_report.py | 3 - .../source/bigquery_v2/bigquery_schema_gen.py | 16 +- .../ingestion/source/bigquery_v2/lineage.py | 10 +- .../ingestion/source/bigquery_v2/usage.py | 110 +++++++------- .../source/cassandra/cassandra_profiling.py | 48 +++--- .../source/cassandra/cassandra_utils.py | 3 - .../source/dremio/dremio_reporting.py | 3 - .../ingestion/source/dremio/dremio_source.py | 4 +- .../datahub/ingestion/source/gc/datahub_gc.py | 24 ++- .../ingestion/source/redshift/redshift.py | 50 +++--- .../ingestion/source/redshift/usage.py | 58 +++---- .../source/snowflake/snowflake_report.py | 3 - .../source/snowflake/snowflake_schema_gen.py | 34 +++-- .../source/snowflake/snowflake_usage_v2.py | 87 ++++++----- .../source/snowflake/snowflake_v2.py | 73 +++++---- .../datahub/ingestion/source/sql/teradata.py | 4 +- .../ingestion/source/tableau/tableau.py | 4 +- .../datahub/ingestion/source/unity/source.py | 142 +++++++++--------- .../source_report/ingestion_stage.py | 29 +--- .../bigquery/test_bigquery_usage.py | 83 +++++----- .../unit/reporting/test_ingestion_stage.py | 8 +- 22 files changed, 407 insertions(+), 448 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 38eab3606b7e9..473db888facaa 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -253,14 +253,14 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: for project in projects: yield from self.bq_schema_extractor.get_project_workunits(project) - self.report.set_ingestion_stage("*", "View and Snapshot Lineage") - yield from self.lineage_extractor.get_lineage_workunits_for_views_and_snapshots( - [p.id for p in projects], - self.bq_schema_extractor.view_refs_by_project, - self.bq_schema_extractor.view_definitions, - self.bq_schema_extractor.snapshot_refs_by_project, - self.bq_schema_extractor.snapshots_by_ref, - ) + with self.report.new_stage("*: View and Snapshot Lineage"): + yield from self.lineage_extractor.get_lineage_workunits_for_views_and_snapshots( + [p.id for p in projects], + self.bq_schema_extractor.view_refs_by_project, + self.bq_schema_extractor.view_definitions, + self.bq_schema_extractor.snapshot_refs_by_project, + self.bq_schema_extractor.snapshots_by_ref, + ) if self.config.use_queries_v2: # if both usage and lineage are disabled then skip queries extractor piece @@ -270,28 +270,27 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: ): return - self.report.set_ingestion_stage("*", QUERIES_EXTRACTION) - - with BigQueryQueriesExtractor( - connection=self.config.get_bigquery_client(), - schema_api=self.bq_schema_extractor.schema_api, - config=BigQueryQueriesExtractorConfig( - window=self.config, - user_email_pattern=self.config.usage.user_email_pattern, - include_lineage=self.config.include_table_lineage, - include_usage_statistics=self.config.include_usage_statistics, - include_operations=self.config.usage.include_operational_stats, - top_n_queries=self.config.usage.top_n_queries, - region_qualifiers=self.config.region_qualifiers, - ), - structured_report=self.report, - filters=self.filters, - identifiers=self.identifiers, - schema_resolver=self.sql_parser_schema_resolver, - discovered_tables=self.bq_schema_extractor.table_refs, - ) as queries_extractor: - self.report.queries_extractor = queries_extractor.report - yield from queries_extractor.get_workunits_internal() + with self.report.new_stage(f"*: {QUERIES_EXTRACTION}"): + with BigQueryQueriesExtractor( + connection=self.config.get_bigquery_client(), + schema_api=self.bq_schema_extractor.schema_api, + config=BigQueryQueriesExtractorConfig( + window=self.config, + user_email_pattern=self.config.usage.user_email_pattern, + include_lineage=self.config.include_table_lineage, + include_usage_statistics=self.config.include_usage_statistics, + include_operations=self.config.usage.include_operational_stats, + top_n_queries=self.config.usage.top_n_queries, + region_qualifiers=self.config.region_qualifiers, + ), + structured_report=self.report, + filters=self.filters, + identifiers=self.identifiers, + schema_resolver=self.sql_parser_schema_resolver, + discovered_tables=self.bq_schema_extractor.table_refs, + ) as queries_extractor: + self.report.queries_extractor = queries_extractor.report + yield from queries_extractor.get_workunits_internal() else: if self.config.include_usage_statistics: diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py index 06842da67f76c..8e55d81aac5fe 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py @@ -190,6 +190,3 @@ class BigQueryV2Report( num_skipped_external_table_lineage: int = 0 queries_extractor: Optional[BigQueryQueriesExtractorReport] = None - - def set_ingestion_stage(self, project_id: str, stage: str) -> None: - self.report_ingestion_stage_start(f"{project_id}: {stage}") diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py index 63e17a74289a7..56e930dfb811f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py @@ -248,9 +248,9 @@ def modified_base32decode(self, text_to_decode: str) -> str: def get_project_workunits( self, project: BigqueryProject ) -> Iterable[MetadataWorkUnit]: - self.report.set_ingestion_stage(project.id, METADATA_EXTRACTION) - logger.info(f"Processing project: {project.id}") - yield from self._process_project(project) + with self.report.new_stage(f"{project.id}: {METADATA_EXTRACTION}"): + logger.info(f"Processing project: {project.id}") + yield from self._process_project(project) def get_dataplatform_instance_aspect( self, dataset_urn: str, project_id: str @@ -405,11 +405,11 @@ def _process_project( if self.config.is_profiling_enabled(): logger.info(f"Starting profiling project {project_id}") - self.report.set_ingestion_stage(project_id, PROFILING) - yield from self.profiler.get_workunits( - project_id=project_id, - tables=db_tables, - ) + with self.report.new_stage(f"{project_id}: {PROFILING}"): + yield from self.profiler.get_workunits( + project_id=project_id, + tables=db_tables, + ) def _process_project_datasets( self, diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index 4863c248d5799..433282a21fdb6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -330,11 +330,11 @@ def get_lineage_workunits( projects = ["*"] # project_id not used when using exported metadata for project in projects: - self.report.set_ingestion_stage(project, LINEAGE_EXTRACTION) - yield from self.generate_lineage( - project, - table_refs, - ) + with self.report.new_stage(f"{project}: {LINEAGE_EXTRACTION}"): + yield from self.generate_lineage( + project, + table_refs, + ) if self.redundant_run_skip_handler: # Update the checkpoint state for this run. diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py index b9ca8deb68d3a..f2f6cc731858d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py @@ -495,62 +495,62 @@ def _ingest_events( def _generate_operational_workunits( self, usage_state: BigQueryUsageState, table_refs: Collection[str] ) -> Iterable[MetadataWorkUnit]: - self.report.set_ingestion_stage("*", USAGE_EXTRACTION_OPERATIONAL_STATS) - for audit_event in usage_state.standalone_events(): - try: - operational_wu = self._create_operation_workunit( - audit_event, table_refs - ) - if operational_wu: - yield operational_wu - self.report.num_operational_stats_workunits_emitted += 1 - except Exception as e: - self.report.warning( - message="Unable to generate operation workunit", - context=f"{audit_event}", - exc=e, - ) + with self.report.new_stage(f"*: {USAGE_EXTRACTION_OPERATIONAL_STATS}"): + for audit_event in usage_state.standalone_events(): + try: + operational_wu = self._create_operation_workunit( + audit_event, table_refs + ) + if operational_wu: + yield operational_wu + self.report.num_operational_stats_workunits_emitted += 1 + except Exception as e: + self.report.warning( + message="Unable to generate operation workunit", + context=f"{audit_event}", + exc=e, + ) def _generate_usage_workunits( self, usage_state: BigQueryUsageState ) -> Iterable[MetadataWorkUnit]: - self.report.set_ingestion_stage("*", USAGE_EXTRACTION_USAGE_AGGREGATION) - top_n = ( - self.config.usage.top_n_queries - if self.config.usage.include_top_n_queries - else 0 - ) - for entry in usage_state.usage_statistics(top_n=top_n): - try: - query_freq = [ - ( - self.uuid_to_query.get( - query_hash, usage_state.queries[query_hash] - ), - count, + with self.report.new_stage(f"*: {USAGE_EXTRACTION_USAGE_AGGREGATION}"): + top_n = ( + self.config.usage.top_n_queries + if self.config.usage.include_top_n_queries + else 0 + ) + for entry in usage_state.usage_statistics(top_n=top_n): + try: + query_freq = [ + ( + self.uuid_to_query.get( + query_hash, usage_state.queries[query_hash] + ), + count, + ) + for query_hash, count in entry.query_freq + ] + yield make_usage_workunit( + bucket_start_time=datetime.fromisoformat(entry.timestamp), + resource=BigQueryTableRef.from_string_name(entry.resource), + query_count=entry.query_count, + query_freq=query_freq, + user_freq=entry.user_freq, + column_freq=entry.column_freq, + bucket_duration=self.config.bucket_duration, + resource_urn_builder=self.identifiers.gen_dataset_urn_from_raw_ref, + top_n_queries=self.config.usage.top_n_queries, + format_sql_queries=self.config.usage.format_sql_queries, + queries_character_limit=self.config.usage.queries_character_limit, + ) + self.report.num_usage_workunits_emitted += 1 + except Exception as e: + self.report.warning( + message="Unable to generate usage statistics workunit", + context=f"{entry.timestamp}, {entry.resource}", + exc=e, ) - for query_hash, count in entry.query_freq - ] - yield make_usage_workunit( - bucket_start_time=datetime.fromisoformat(entry.timestamp), - resource=BigQueryTableRef.from_string_name(entry.resource), - query_count=entry.query_count, - query_freq=query_freq, - user_freq=entry.user_freq, - column_freq=entry.column_freq, - bucket_duration=self.config.bucket_duration, - resource_urn_builder=self.identifiers.gen_dataset_urn_from_raw_ref, - top_n_queries=self.config.usage.top_n_queries, - format_sql_queries=self.config.usage.format_sql_queries, - queries_character_limit=self.config.usage.queries_character_limit, - ) - self.report.num_usage_workunits_emitted += 1 - except Exception as e: - self.report.warning( - message="Unable to generate usage statistics workunit", - context=f"{entry.timestamp}, {entry.resource}", - exc=e, - ) def _get_usage_events(self, projects: Iterable[str]) -> Iterable[AuditEvent]: if self.config.use_exported_bigquery_audit_metadata: @@ -559,10 +559,10 @@ def _get_usage_events(self, projects: Iterable[str]) -> Iterable[AuditEvent]: for project_id in projects: with PerfTimer() as timer: try: - self.report.set_ingestion_stage( - project_id, USAGE_EXTRACTION_INGESTION - ) - yield from self._get_parsed_bigquery_log_events(project_id) + with self.report.new_stage( + f"{project_id}: {USAGE_EXTRACTION_INGESTION}" + ): + yield from self._get_parsed_bigquery_log_events(project_id) except Exception as e: self.report.usage_failed_extraction.append(project_id) self.report.warning( diff --git a/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_profiling.py b/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_profiling.py index d8ab62f1d6d91..7bf1d66f618a4 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_profiling.py +++ b/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_profiling.py @@ -70,30 +70,30 @@ def get_workunits( ) -> Iterable[MetadataWorkUnit]: for keyspace_name in cassandra_data.keyspaces: tables = cassandra_data.tables.get(keyspace_name, []) - self.report.set_ingestion_stage(keyspace_name, PROFILING) - with ThreadPoolExecutor( - max_workers=self.config.profiling.max_workers - ) as executor: - future_to_dataset = { - executor.submit( - self.generate_profile, - keyspace_name, - table_name, - cassandra_data.columns.get(table_name, []), - ): table_name - for table_name in tables - } - for future in as_completed(future_to_dataset): - table_name = future_to_dataset[future] - try: - yield from future.result() - except Exception as exc: - self.report.profiling_skipped_other[table_name] += 1 - self.report.failure( - message="Failed to profile for table", - context=f"{keyspace_name}.{table_name}", - exc=exc, - ) + with self.report.new_stage(f"{keyspace_name}: {PROFILING}"): + with ThreadPoolExecutor( + max_workers=self.config.profiling.max_workers + ) as executor: + future_to_dataset = { + executor.submit( + self.generate_profile, + keyspace_name, + table_name, + cassandra_data.columns.get(table_name, []), + ): table_name + for table_name in tables + } + for future in as_completed(future_to_dataset): + table_name = future_to_dataset[future] + try: + yield from future.result() + except Exception as exc: + self.report.profiling_skipped_other[table_name] += 1 + self.report.failure( + message="Failed to profile for table", + context=f"{keyspace_name}.{table_name}", + exc=exc, + ) def generate_profile( self, diff --git a/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_utils.py b/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_utils.py index 41d4ac7ced603..75a0ba0c61773 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_utils.py +++ b/metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_utils.py @@ -54,9 +54,6 @@ def report_entity_scanned(self, name: str, ent_type: str = "View") -> None: else: raise KeyError(f"Unknown entity {ent_type}.") - def set_ingestion_stage(self, keyspace: str, stage: str) -> None: - self.report_ingestion_stage_start(f"{keyspace}: {stage}") - # TODO Need to create seperate common config for profiling report profiling_skipped_other: TopKDict[str, int] = field(default_factory=int_top_k_dict) profiling_skipped_table_profile_pattern: TopKDict[str, int] = field( diff --git a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_reporting.py b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_reporting.py index c8eb035461ca1..9712d4ddc6799 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_reporting.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_reporting.py @@ -45,6 +45,3 @@ def report_entity_scanned(self, name: str, ent_type: str = "View") -> None: self.views_scanned += 1 else: raise KeyError(f"Unknown entity {ent_type}.") - - def set_ingestion_stage(self, dataset: str, stage: str) -> None: - self.report_ingestion_stage_start(f"{dataset}: {stage}") diff --git a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_source.py b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_source.py index 319290d25169a..6d34e86be6282 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_source.py @@ -472,8 +472,8 @@ def generate_profiles( env=self.config.env, platform_instance=self.config.platform_instance, ) - self.report.set_ingestion_stage(dataset_info.resource_name, PROFILING) - yield from self.profiler.get_workunits(dataset_info, dataset_urn) + with self.report.new_stage(f"{dataset_info.resource_name}: {PROFILING}"): + yield from self.profiler.get_workunits(dataset_info, dataset_urn) def generate_view_lineage( self, dataset_urn: str, parents: List[str] diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py b/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py index 168b787b85e8b..317c335411eda 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py @@ -141,40 +141,36 @@ def get_workunits_internal( ) -> Iterable[MetadataWorkUnit]: if self.config.cleanup_expired_tokens: try: - self.report.report_ingestion_stage_start("Expired Token Cleanup") - self.revoke_expired_tokens() + with self.report.new_stage("Expired Token Cleanup"): + self.revoke_expired_tokens() except Exception as e: self.report.failure("While trying to cleanup expired token ", exc=e) if self.config.truncate_indices: try: - self.report.report_ingestion_stage_start("Truncate Indices") - self.truncate_indices() + with self.report.new_stage("Truncate Indices"): + self.truncate_indices() except Exception as e: self.report.failure("While trying to truncate indices ", exc=e) if self.config.soft_deleted_entities_cleanup.enabled: try: - self.report.report_ingestion_stage_start( - "Soft Deleted Entities Cleanup" - ) - self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities() + with self.report.new_stage("Soft Deleted Entities Cleanup"): + self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities() except Exception as e: self.report.failure( "While trying to cleanup soft deleted entities ", exc=e ) if self.config.dataprocess_cleanup.enabled: try: - self.report.report_ingestion_stage_start("Data Process Cleanup") - yield from self.dataprocess_cleanup.get_workunits_internal() + with self.report.new_stage("Data Process Cleanup"): + yield from self.dataprocess_cleanup.get_workunits_internal() except Exception as e: self.report.failure("While trying to cleanup data process ", exc=e) if self.config.execution_request_cleanup.enabled: try: - self.report.report_ingestion_stage_start("Execution request Cleanup") - self.execution_request_cleanup.run() + with self.report.new_stage("Execution request Cleanup"): + self.execution_request_cleanup.run() except Exception as e: self.report.failure("While trying to cleanup execution request ", exc=e) - # Otherwise last stage's duration does not get calculated. - self.report.report_ingestion_stage_start("End") yield from [] def truncate_indices(self) -> None: diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py index a8ddda201f9c5..5371017a2a321 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/redshift.py @@ -423,10 +423,10 @@ def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit database = self.config.database logger.info(f"Processing db {database}") - self.report.report_ingestion_stage_start(METADATA_EXTRACTION) - self.db_tables[database] = defaultdict() - self.db_views[database] = defaultdict() - self.db_schemas.setdefault(database, {}) + with self.report.new_stage(METADATA_EXTRACTION): + self.db_tables[database] = defaultdict() + self.db_views[database] = defaultdict() + self.db_schemas.setdefault(database, {}) # TODO: Ideally, we'd push down exception handling to the place where the connection is used, as opposed to keeping # this fallback. For now, this gets us broad coverage quickly. @@ -462,12 +462,12 @@ def _extract_metadata( self.process_schemas(connection, database) ) - self.report.report_ingestion_stage_start(LINEAGE_EXTRACTION) - yield from self.extract_lineage_v2( - connection=connection, - database=database, - lineage_extractor=lineage_extractor, - ) + with self.report.new_stage(LINEAGE_EXTRACTION): + yield from self.extract_lineage_v2( + connection=connection, + database=database, + lineage_extractor=lineage_extractor, + ) all_tables = self.get_all_tables() else: @@ -480,25 +480,25 @@ def _extract_metadata( or self.config.include_view_lineage or self.config.include_copy_lineage ): - self.report.report_ingestion_stage_start(LINEAGE_EXTRACTION) - yield from self.extract_lineage( - connection=connection, all_tables=all_tables, database=database - ) + with self.report.new_stage(LINEAGE_EXTRACTION): + yield from self.extract_lineage( + connection=connection, all_tables=all_tables, database=database + ) - self.report.report_ingestion_stage_start(USAGE_EXTRACTION_INGESTION) if self.config.include_usage_statistics: - yield from self.extract_usage( - connection=connection, all_tables=all_tables, database=database - ) + with self.report.new_stage(USAGE_EXTRACTION_INGESTION): + yield from self.extract_usage( + connection=connection, all_tables=all_tables, database=database + ) if self.config.is_profiling_enabled(): - self.report.report_ingestion_stage_start(PROFILING) - profiler = RedshiftProfiler( - config=self.config, - report=self.report, - state_handler=self.profiling_state_handler, - ) - yield from profiler.get_workunits(self.db_tables) + with self.report.new_stage(PROFILING): + profiler = RedshiftProfiler( + config=self.config, + report=self.report, + state_handler=self.profiling_state_handler, + ) + yield from profiler.get_workunits(self.db_tables) def process_schemas(self, connection, database): for schema in self.data_dictionary.get_schemas( diff --git a/metadata-ingestion/src/datahub/ingestion/source/redshift/usage.py b/metadata-ingestion/src/datahub/ingestion/source/redshift/usage.py index 7c6affd50321f..d66a1ee18be40 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redshift/usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redshift/usage.py @@ -182,38 +182,38 @@ def _get_workunits_internal( self.report.num_operational_stats_filtered = 0 if self.config.include_operational_stats: - self.report.report_ingestion_stage_start(USAGE_EXTRACTION_OPERATIONAL_STATS) - with PerfTimer() as timer: - # Generate operation aspect workunits - yield from self._gen_operation_aspect_workunits( - self.connection, all_tables - ) - self.report.operational_metadata_extraction_sec[ - self.config.database - ] = timer.elapsed_seconds(digits=2) + with self.report.new_stage(USAGE_EXTRACTION_OPERATIONAL_STATS): + with PerfTimer() as timer: + # Generate operation aspect workunits + yield from self._gen_operation_aspect_workunits( + self.connection, all_tables + ) + self.report.operational_metadata_extraction_sec[ + self.config.database + ] = timer.elapsed_seconds(digits=2) # Generate aggregate events - self.report.report_ingestion_stage_start(USAGE_EXTRACTION_USAGE_AGGREGATION) - query: str = self.queries.usage_query( - start_time=self.start_time.strftime(REDSHIFT_DATETIME_FORMAT), - end_time=self.end_time.strftime(REDSHIFT_DATETIME_FORMAT), - database=self.config.database, - ) - access_events_iterable: Iterable[ - RedshiftAccessEvent - ] = self._gen_access_events_from_history_query( - query, connection=self.connection, all_tables=all_tables - ) + with self.report.new_stage(USAGE_EXTRACTION_USAGE_AGGREGATION): + query: str = self.queries.usage_query( + start_time=self.start_time.strftime(REDSHIFT_DATETIME_FORMAT), + end_time=self.end_time.strftime(REDSHIFT_DATETIME_FORMAT), + database=self.config.database, + ) + access_events_iterable: Iterable[ + RedshiftAccessEvent + ] = self._gen_access_events_from_history_query( + query, connection=self.connection, all_tables=all_tables + ) - aggregated_events: AggregatedAccessEvents = self._aggregate_access_events( - access_events_iterable - ) - # Generate usage workunits from aggregated events. - for time_bucket in aggregated_events.values(): - for aggregate in time_bucket.values(): - wu: MetadataWorkUnit = self._make_usage_stat(aggregate) - self.report.num_usage_workunits_emitted += 1 - yield wu + aggregated_events: AggregatedAccessEvents = self._aggregate_access_events( + access_events_iterable + ) + # Generate usage workunits from aggregated events. + for time_bucket in aggregated_events.values(): + for aggregate in time_bucket.values(): + wu: MetadataWorkUnit = self._make_usage_stat(aggregate) + self.report.num_usage_workunits_emitted += 1 + yield wu def _gen_operation_aspect_workunits( self, diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py index 030b2d43be81f..b24471f8666af 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_report.py @@ -166,6 +166,3 @@ def _is_tag_scanned(self, tag_name: str) -> bool: def report_tag_processed(self, tag_name: str) -> None: self._processed_tags.add(tag_name) - - def set_ingestion_stage(self, database: str, stage: str) -> None: - self.report_ingestion_stage_start(f"{database}: {stage}") diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py index 8a1bf15b7a7bc..6f09c26b08da2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_schema_gen.py @@ -216,21 +216,23 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: try: for snowflake_db in self.databases: - self.report.set_ingestion_stage(snowflake_db.name, METADATA_EXTRACTION) - yield from self._process_database(snowflake_db) + with self.report.new_stage( + f"{snowflake_db.name}: {METADATA_EXTRACTION}" + ): + yield from self._process_database(snowflake_db) - self.report.set_ingestion_stage("*", EXTERNAL_TABLE_DDL_LINEAGE) - discovered_tables: List[str] = [ - self.identifiers.get_dataset_identifier( - table_name, schema.name, db.name - ) - for db in self.databases - for schema in db.schemas - for table_name in schema.tables - ] - if self.aggregator: - for entry in self._external_tables_ddl_lineage(discovered_tables): - self.aggregator.add(entry) + with self.report.new_stage(f"*: {EXTERNAL_TABLE_DDL_LINEAGE}"): + discovered_tables: List[str] = [ + self.identifiers.get_dataset_identifier( + table_name, schema.name, db.name + ) + for db in self.databases + for schema in db.schemas + for table_name in schema.tables + ] + if self.aggregator: + for entry in self._external_tables_ddl_lineage(discovered_tables): + self.aggregator.add(entry) except SnowflakePermissionError as e: self.structured_reporter.failure( @@ -332,8 +334,8 @@ def _process_database( yield from self._process_db_schemas(snowflake_db, db_tables) if self.profiler and db_tables: - self.report.set_ingestion_stage(snowflake_db.name, PROFILING) - yield from self.profiler.get_workunits(snowflake_db, db_tables) + with self.report.new_stage(f"{snowflake_db.name}: {PROFILING}"): + yield from self.profiler.get_workunits(snowflake_db, db_tables) def _process_db_schemas( self, diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py index 73565ebf30593..85e4071aec07d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_usage_v2.py @@ -146,59 +146,58 @@ def get_usage_workunits( if not self._should_ingest_usage(): return - self.report.set_ingestion_stage("*", USAGE_EXTRACTION_USAGE_AGGREGATION) - if self.report.edition == SnowflakeEdition.STANDARD.value: - logger.info( - "Snowflake Account is Standard Edition. Usage and Operation History Feature is not supported." - ) - return + with self.report.new_stage(f"*: {USAGE_EXTRACTION_USAGE_AGGREGATION}"): + if self.report.edition == SnowflakeEdition.STANDARD.value: + logger.info( + "Snowflake Account is Standard Edition. Usage and Operation History Feature is not supported." + ) + return - logger.info("Checking usage date ranges") + logger.info("Checking usage date ranges") - self._check_usage_date_ranges() + self._check_usage_date_ranges() - # If permission error, execution returns from here - if ( - self.report.min_access_history_time is None - or self.report.max_access_history_time is None - ): - return + # If permission error, execution returns from here + if ( + self.report.min_access_history_time is None + or self.report.max_access_history_time is None + ): + return - # NOTE: In earlier `snowflake-usage` connector, users with no email were not considered in usage counts as well as in operation - # Now, we report the usage as well as operation metadata even if user email is absent + # NOTE: In earlier `snowflake-usage` connector, users with no email were not considered in usage counts as well as in operation + # Now, we report the usage as well as operation metadata even if user email is absent - if self.config.include_usage_stats: - yield from auto_empty_dataset_usage_statistics( - self._get_workunits_internal(discovered_datasets), - config=BaseTimeWindowConfig( - start_time=self.start_time, - end_time=self.end_time, - bucket_duration=self.config.bucket_duration, - ), - dataset_urns={ - self.identifiers.gen_dataset_urn(dataset_identifier) - for dataset_identifier in discovered_datasets - }, - ) + if self.config.include_usage_stats: + yield from auto_empty_dataset_usage_statistics( + self._get_workunits_internal(discovered_datasets), + config=BaseTimeWindowConfig( + start_time=self.start_time, + end_time=self.end_time, + bucket_duration=self.config.bucket_duration, + ), + dataset_urns={ + self.identifiers.gen_dataset_urn(dataset_identifier) + for dataset_identifier in discovered_datasets + }, + ) - self.report.set_ingestion_stage("*", USAGE_EXTRACTION_OPERATIONAL_STATS) + with self.report.new_stage(f"*: {USAGE_EXTRACTION_OPERATIONAL_STATS}"): + if self.config.include_operational_stats: + # Generate the operation workunits. + access_events = self._get_snowflake_history() + for event in access_events: + yield from self._get_operation_aspect_work_unit( + event, discovered_datasets + ) - if self.config.include_operational_stats: - # Generate the operation workunits. - access_events = self._get_snowflake_history() - for event in access_events: - yield from self._get_operation_aspect_work_unit( - event, discovered_datasets + if self.redundant_run_skip_handler: + # Update the checkpoint state for this run. + self.redundant_run_skip_handler.update_state( + self.config.start_time, + self.config.end_time, + self.config.bucket_duration, ) - if self.redundant_run_skip_handler: - # Update the checkpoint state for this run. - self.redundant_run_skip_handler.update_state( - self.config.start_time, - self.config.end_time, - self.config.bucket_duration, - ) - def _get_workunits_internal( self, discovered_datasets: List[str] ) -> Iterable[MetadataWorkUnit]: diff --git a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py index 954e8a29c1a1b..ee1b59ccd86c3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py +++ b/metadata-ingestion/src/datahub/ingestion/source/snowflake/snowflake_v2.py @@ -480,8 +480,8 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: identifiers=self.identifiers, ) - self.report.set_ingestion_stage("*", METADATA_EXTRACTION) - yield from schema_extractor.get_workunits_internal() + with self.report.new_stage(f"*: {METADATA_EXTRACTION}"): + yield from schema_extractor.get_workunits_internal() databases = schema_extractor.databases @@ -513,45 +513,44 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: discovered_datasets = discovered_tables + discovered_views if self.config.use_queries_v2: - self.report.set_ingestion_stage("*", VIEW_PARSING) - yield from auto_workunit(self.aggregator.gen_metadata()) - - self.report.set_ingestion_stage("*", QUERIES_EXTRACTION) - - schema_resolver = self.aggregator._schema_resolver - - queries_extractor = SnowflakeQueriesExtractor( - connection=self.connection, - config=SnowflakeQueriesExtractorConfig( - window=self.config, - temporary_tables_pattern=self.config.temporary_tables_pattern, - include_lineage=self.config.include_table_lineage, - include_usage_statistics=self.config.include_usage_stats, - include_operations=self.config.include_operational_stats, - user_email_pattern=self.config.user_email_pattern, - ), - structured_report=self.report, - filters=self.filters, - identifiers=self.identifiers, - schema_resolver=schema_resolver, - discovered_tables=discovered_datasets, - graph=self.ctx.graph, - ) + with self.report.new_stage(f"*: {VIEW_PARSING}"): + yield from auto_workunit(self.aggregator.gen_metadata()) - # TODO: This is slightly suboptimal because we create two SqlParsingAggregator instances with different configs - # but a shared schema resolver. That's fine for now though - once we remove the old lineage/usage extractors, - # it should be pretty straightforward to refactor this and only initialize the aggregator once. - self.report.queries_extractor = queries_extractor.report - yield from queries_extractor.get_workunits_internal() - queries_extractor.close() + with self.report.new_stage(f"*: {QUERIES_EXTRACTION}"): + schema_resolver = self.aggregator._schema_resolver + + queries_extractor = SnowflakeQueriesExtractor( + connection=self.connection, + config=SnowflakeQueriesExtractorConfig( + window=self.config, + temporary_tables_pattern=self.config.temporary_tables_pattern, + include_lineage=self.config.include_table_lineage, + include_usage_statistics=self.config.include_usage_stats, + include_operations=self.config.include_operational_stats, + user_email_pattern=self.config.user_email_pattern, + ), + structured_report=self.report, + filters=self.filters, + identifiers=self.identifiers, + schema_resolver=schema_resolver, + discovered_tables=discovered_datasets, + graph=self.ctx.graph, + ) + + # TODO: This is slightly suboptimal because we create two SqlParsingAggregator instances with different configs + # but a shared schema resolver. That's fine for now though - once we remove the old lineage/usage extractors, + # it should be pretty straightforward to refactor this and only initialize the aggregator once. + self.report.queries_extractor = queries_extractor.report + yield from queries_extractor.get_workunits_internal() + queries_extractor.close() else: if self.lineage_extractor: - self.report.set_ingestion_stage("*", LINEAGE_EXTRACTION) - self.lineage_extractor.add_time_based_lineage_to_aggregator( - discovered_tables=discovered_tables, - discovered_views=discovered_views, - ) + with self.report.new_stage(f"*: {LINEAGE_EXTRACTION}"): + self.lineage_extractor.add_time_based_lineage_to_aggregator( + discovered_tables=discovered_tables, + discovered_views=discovered_views, + ) # This would emit view and external table ddl lineage # as well as query lineage via lineage_extractor diff --git a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py index e42564975c3d1..5b76fe41d92e9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py +++ b/metadata-ingestion/src/datahub/ingestion/source/sql/teradata.py @@ -878,7 +878,7 @@ def get_workunits_internal(self) -> Iterable[Union[MetadataWorkUnit, SqlWorkUnit urns = self.schema_resolver.get_urns() if self.config.include_table_lineage or self.config.include_usage_statistics: - self.report.report_ingestion_stage_start("audit log extraction") - yield from self.get_audit_log_mcps(urns=urns) + with self.report.new_stage("Audit log extraction"): + yield from self.get_audit_log_mcps(urns=urns) yield from self.builder.gen_workunits() diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py index bf6d81ea97c51..f24988acb0d7b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py @@ -118,7 +118,7 @@ ) from datahub.ingestion.source.tableau.tableau_server_wrapper import UserInfo from datahub.ingestion.source.tableau.tableau_validation import check_user_role -from datahub.ingestion.source_report.ingestion_stage import IngestionStageContextReport +from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport from datahub.metadata.com.linkedin.pegasus2avro.common import ( AuditStamp, ChangeAuditStamps, @@ -643,7 +643,7 @@ class SiteIdContentUrl: @dataclass class TableauSourceReport( StaleEntityRemovalSourceReport, - IngestionStageContextReport, + IngestionStageReport, ): get_all_datasources_query_failed: bool = False num_get_datasource_query_failures: int = 0 diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index 7bfa7fdb28aaf..ba0f958458e68 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -267,86 +267,86 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]: ] def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: - self.report.report_ingestion_stage_start("Ingestion Setup") - wait_on_warehouse = None - if self.config.include_hive_metastore: - self.report.report_ingestion_stage_start("Start warehouse") - # Can take several minutes, so start now and wait later - wait_on_warehouse = self.unity_catalog_api_proxy.start_warehouse() - if wait_on_warehouse is None: - self.report.report_failure( - "initialization", - f"SQL warehouse {self.config.profiling.warehouse_id} not found", - ) - return - else: - # wait until warehouse is started - wait_on_warehouse.result() + with self.report.new_stage("Ingestion Setup"): + wait_on_warehouse = None + if self.config.include_hive_metastore: + with self.report.new_stage("Start warehouse"): + # Can take several minutes, so start now and wait later + wait_on_warehouse = self.unity_catalog_api_proxy.start_warehouse() + if wait_on_warehouse is None: + self.report.report_failure( + "initialization", + f"SQL warehouse {self.config.profiling.warehouse_id} not found", + ) + return + else: + # wait until warehouse is started + wait_on_warehouse.result() if self.config.include_ownership: - self.report.report_ingestion_stage_start("Ingest service principals") - self.build_service_principal_map() - self.build_groups_map() + with self.report.new_stage("Ingest service principals"): + self.build_service_principal_map() + self.build_groups_map() if self.config.include_notebooks: - self.report.report_ingestion_stage_start("Ingest notebooks") - yield from self.process_notebooks() + with self.report.new_stage("Ingest notebooks"): + yield from self.process_notebooks() yield from self.process_metastores() yield from self.get_view_lineage() if self.config.include_notebooks: - self.report.report_ingestion_stage_start("Notebook lineage") - for notebook in self.notebooks.values(): - wu = self._gen_notebook_lineage(notebook) - if wu: - yield wu + with self.report.new_stage("Notebook lineage"): + for notebook in self.notebooks.values(): + wu = self._gen_notebook_lineage(notebook) + if wu: + yield wu if self.config.include_usage_statistics: - self.report.report_ingestion_stage_start("Ingest usage") - usage_extractor = UnityCatalogUsageExtractor( - config=self.config, - report=self.report, - proxy=self.unity_catalog_api_proxy, - table_urn_builder=self.gen_dataset_urn, - user_urn_builder=self.gen_user_urn, - ) - yield from usage_extractor.get_usage_workunits( - self.table_refs | self.view_refs - ) - - if self.config.is_profiling_enabled(): - self.report.report_ingestion_stage_start("Start warehouse") - # Need to start the warehouse again for profiling, - # as it may have been stopped after ingestion might take - # longer time to complete - wait_on_warehouse = self.unity_catalog_api_proxy.start_warehouse() - if wait_on_warehouse is None: - self.report.report_failure( - "initialization", - f"SQL warehouse {self.config.profiling.warehouse_id} not found", + with self.report.new_stage("Ingest usage"): + usage_extractor = UnityCatalogUsageExtractor( + config=self.config, + report=self.report, + proxy=self.unity_catalog_api_proxy, + table_urn_builder=self.gen_dataset_urn, + user_urn_builder=self.gen_user_urn, + ) + yield from usage_extractor.get_usage_workunits( + self.table_refs | self.view_refs ) - return - else: - # wait until warehouse is started - wait_on_warehouse.result() - self.report.report_ingestion_stage_start("Profiling") - if isinstance(self.config.profiling, UnityCatalogAnalyzeProfilerConfig): - yield from UnityCatalogAnalyzeProfiler( - self.config.profiling, - self.report, - self.unity_catalog_api_proxy, - self.gen_dataset_urn, - ).get_workunits(self.table_refs) - elif isinstance(self.config.profiling, UnityCatalogGEProfilerConfig): - yield from UnityCatalogGEProfiler( - sql_common_config=self.config, - profiling_config=self.config.profiling, - report=self.report, - ).get_workunits(list(self.tables.values())) - else: - raise ValueError("Unknown profiling config method") + if self.config.is_profiling_enabled(): + with self.report.new_stage("Start warehouse"): + # Need to start the warehouse again for profiling, + # as it may have been stopped after ingestion might take + # longer time to complete + wait_on_warehouse = self.unity_catalog_api_proxy.start_warehouse() + if wait_on_warehouse is None: + self.report.report_failure( + "initialization", + f"SQL warehouse {self.config.profiling.warehouse_id} not found", + ) + return + else: + # wait until warehouse is started + wait_on_warehouse.result() + + with self.report.new_stage("Profiling"): + if isinstance(self.config.profiling, UnityCatalogAnalyzeProfilerConfig): + yield from UnityCatalogAnalyzeProfiler( + self.config.profiling, + self.report, + self.unity_catalog_api_proxy, + self.gen_dataset_urn, + ).get_workunits(self.table_refs) + elif isinstance(self.config.profiling, UnityCatalogGEProfilerConfig): + yield from UnityCatalogGEProfiler( + sql_common_config=self.config, + profiling_config=self.config.profiling, + report=self.report, + ).get_workunits(list(self.tables.values())) + else: + raise ValueError("Unknown profiling config method") def build_service_principal_map(self) -> None: try: @@ -466,11 +466,11 @@ def process_schemas(self, catalog: Catalog) -> Iterable[MetadataWorkUnit]: self.report.schemas.dropped(schema.id) continue - self.report.report_ingestion_stage_start(f"Ingest schema {schema.id}") - yield from self.gen_schema_containers(schema) - yield from self.process_tables(schema) + with self.report.new_stage(f"Ingest schema {schema.id}"): + yield from self.gen_schema_containers(schema) + yield from self.process_tables(schema) - self.report.schemas.processed(schema.id) + self.report.schemas.processed(schema.id) def process_tables(self, schema: Schema) -> Iterable[MetadataWorkUnit]: for table in self.unity_catalog_api_proxy.tables(schema=schema): diff --git a/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py b/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py index 40959af60ed2b..130a36e254fef 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py +++ b/metadata-ingestion/src/datahub/ingestion/source_report/ingestion_stage.py @@ -2,7 +2,6 @@ from contextlib import AbstractContextManager from dataclasses import dataclass, field from datetime import datetime, timezone -from typing import Optional from datahub.utilities.perf_timer import PerfTimer from datahub.utilities.stats_collections import TopKDict @@ -23,32 +22,6 @@ @dataclass class IngestionStageReport: - ingestion_stage: Optional[str] = None - ingestion_stage_durations: TopKDict[str, float] = field(default_factory=TopKDict) - - _timer: Optional[PerfTimer] = field( - default=None, init=False, repr=False, compare=False - ) - - def report_ingestion_stage_start(self, stage: str) -> None: - if self._timer: - elapsed = self._timer.elapsed_seconds(digits=2) - logger.info( - f"Time spent in stage <{self.ingestion_stage}>: {elapsed} seconds", - stacklevel=2, - ) - if self.ingestion_stage: - self.ingestion_stage_durations[self.ingestion_stage] = elapsed - else: - self._timer = PerfTimer() - - self.ingestion_stage = f"{stage} at {datetime.now(timezone.utc)}" - logger.info(f"Stage started: {self.ingestion_stage}") - self._timer.start() - - -@dataclass -class IngestionStageContextReport: ingestion_stage_durations: TopKDict[str, float] = field(default_factory=TopKDict) def new_stage(self, stage: str) -> "IngestionStageContext": @@ -57,7 +30,7 @@ def new_stage(self, stage: str) -> "IngestionStageContext": @dataclass class IngestionStageContext(AbstractContextManager): - def __init__(self, stage: str, report: IngestionStageContextReport): + def __init__(self, stage: str, report: IngestionStageReport): self._ingestion_stage = f"{stage} at {datetime.now(timezone.utc)}" self._timer: PerfTimer = PerfTimer() self._report = report diff --git a/metadata-ingestion/tests/performance/bigquery/test_bigquery_usage.py b/metadata-ingestion/tests/performance/bigquery/test_bigquery_usage.py index ee5baacf2441f..24460f3829806 100644 --- a/metadata-ingestion/tests/performance/bigquery/test_bigquery_usage.py +++ b/metadata-ingestion/tests/performance/bigquery/test_bigquery_usage.py @@ -26,14 +26,14 @@ def run_test(): report = BigQueryV2Report() - report.set_ingestion_stage("All", "Seed Data Generation") - seed_metadata = generate_data( - num_containers=2000, - num_tables=20000, - num_views=2000, - time_range=timedelta(days=7), - ) - all_tables = seed_metadata.all_tables + with report.new_stage("All: Seed Data Generation"): + seed_metadata = generate_data( + num_containers=2000, + num_tables=20000, + num_views=2000, + time_range=timedelta(days=7), + ) + all_tables = seed_metadata.all_tables config = BigQueryV2Config( start_time=seed_metadata.start_time, @@ -51,42 +51,45 @@ def run_test(): schema_resolver=SchemaResolver(platform="bigquery"), identifiers=BigQueryIdentifierBuilder(config, report), ) - report.set_ingestion_stage("All", "Event Generation") - - num_projects = 100 - projects = [f"project-{i}" for i in range(num_projects)] - table_to_project = {table.name: random.choice(projects) for table in all_tables} - table_refs = {str(ref_from_table(table, table_to_project)) for table in all_tables} + with report.new_stage("All: Event Generation"): + num_projects = 100 + projects = [f"project-{i}" for i in range(num_projects)] + table_to_project = {table.name: random.choice(projects) for table in all_tables} + table_refs = { + str(ref_from_table(table, table_to_project)) for table in all_tables + } - queries = list( - generate_queries( - seed_metadata, - num_selects=240_000, - num_operations=800_000, - num_unique_queries=50_000, - num_users=2000, - query_length=NormalDistribution(2000, 500), + queries = list( + generate_queries( + seed_metadata, + num_selects=240_000, + num_operations=800_000, + num_unique_queries=50_000, + num_users=2000, + query_length=NormalDistribution(2000, 500), + ) ) - ) - queries.sort(key=lambda q: q.timestamp) - events = list(generate_events(queries, projects, table_to_project, config=config)) - print(f"Events generated: {len(events)}") - pre_mem_usage = psutil.Process(os.getpid()).memory_info().rss - print(f"Test data size: {humanfriendly.format_size(pre_mem_usage)}") + queries.sort(key=lambda q: q.timestamp) + events = list( + generate_events(queries, projects, table_to_project, config=config) + ) + print(f"Events generated: {len(events)}") + pre_mem_usage = psutil.Process(os.getpid()).memory_info().rss + print(f"Test data size: {humanfriendly.format_size(pre_mem_usage)}") - report.set_ingestion_stage("All", "Event Ingestion") - with PerfTimer() as timer: - workunits = usage_extractor._get_workunits_internal(events, table_refs) - num_workunits, peak_memory_usage = workunit_sink(workunits) - report.set_ingestion_stage("All", "Done") - print(f"Workunits Generated: {num_workunits}") - print(f"Seconds Elapsed: {timer.elapsed_seconds(digits=2)} seconds") + with report.new_stage("All: Event Ingestion"): + with PerfTimer() as timer: + workunits = usage_extractor._get_workunits_internal(events, table_refs) + num_workunits, peak_memory_usage = workunit_sink(workunits) + with report.new_stage("All: Done"): + print(f"Workunits Generated: {num_workunits}") + print(f"Seconds Elapsed: {timer.elapsed_seconds(digits=2)} seconds") - print( - f"Peak Memory Used: {humanfriendly.format_size(peak_memory_usage - pre_mem_usage)}" - ) - print(f"Disk Used: {report.processing_perf.usage_state_size}") - print(f"Hash collisions: {report.num_usage_query_hash_collisions}") + print( + f"Peak Memory Used: {humanfriendly.format_size(peak_memory_usage - pre_mem_usage)}" + ) + print(f"Disk Used: {report.processing_perf.usage_state_size}") + print(f"Hash collisions: {report.num_usage_query_hash_collisions}") if __name__ == "__main__": diff --git a/metadata-ingestion/tests/unit/reporting/test_ingestion_stage.py b/metadata-ingestion/tests/unit/reporting/test_ingestion_stage.py index 4b791a2c83d85..8bae38eaa7444 100644 --- a/metadata-ingestion/tests/unit/reporting/test_ingestion_stage.py +++ b/metadata-ingestion/tests/unit/reporting/test_ingestion_stage.py @@ -1,10 +1,10 @@ import time -from datahub.ingestion.source_report.ingestion_stage import IngestionStageContextReport +from datahub.ingestion.source_report.ingestion_stage import IngestionStageReport def test_ingestion_stage_context_records_duration(): - report = IngestionStageContextReport() + report = IngestionStageReport() with report.new_stage(stage="Test Stage"): pass assert len(report.ingestion_stage_durations) == 1 @@ -12,7 +12,7 @@ def test_ingestion_stage_context_records_duration(): def test_ingestion_stage_context_handles_exceptions(): - report = IngestionStageContextReport() + report = IngestionStageReport() try: with report.new_stage(stage="Test Stage"): raise ValueError("Test Exception") @@ -23,7 +23,7 @@ def test_ingestion_stage_context_handles_exceptions(): def test_ingestion_stage_context_report_handles_multiple_stages(): - report = IngestionStageContextReport() + report = IngestionStageReport() with report.new_stage(stage="Test Stage 1"): time.sleep(0.1) with report.new_stage(stage="Test Stage 2"):