Skip to content

Commit 52e9f69

Browse files
sgomezvillamorchakru-r
authored andcommitted
chore(tableau): set ingestion stage report and perftimers (datahub-project#12234)
1 parent 144db11 commit 52e9f69

27 files changed

+625
-499
lines changed

metadata-ingestion/src/datahub/ingestion/api/source.py

+2
Original file line numberDiff line numberDiff line change
@@ -334,6 +334,8 @@ def as_obj(self) -> dict:
334334
}
335335

336336
def compute_stats(self) -> None:
337+
super().compute_stats()
338+
337339
duration = datetime.datetime.now() - self.start_time
338340
workunits_produced = self.events_produced
339341
if duration.total_seconds() > 0:

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py

+31-33
Original file line numberDiff line numberDiff line change
@@ -253,14 +253,14 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
253253
for project in projects:
254254
yield from self.bq_schema_extractor.get_project_workunits(project)
255255

256-
self.report.set_ingestion_stage("*", "View and Snapshot Lineage")
257-
yield from self.lineage_extractor.get_lineage_workunits_for_views_and_snapshots(
258-
[p.id for p in projects],
259-
self.bq_schema_extractor.view_refs_by_project,
260-
self.bq_schema_extractor.view_definitions,
261-
self.bq_schema_extractor.snapshot_refs_by_project,
262-
self.bq_schema_extractor.snapshots_by_ref,
263-
)
256+
with self.report.new_stage("*: View and Snapshot Lineage"):
257+
yield from self.lineage_extractor.get_lineage_workunits_for_views_and_snapshots(
258+
[p.id for p in projects],
259+
self.bq_schema_extractor.view_refs_by_project,
260+
self.bq_schema_extractor.view_definitions,
261+
self.bq_schema_extractor.snapshot_refs_by_project,
262+
self.bq_schema_extractor.snapshots_by_ref,
263+
)
264264

265265
if self.config.use_queries_v2:
266266
# if both usage and lineage are disabled then skip queries extractor piece
@@ -270,31 +270,29 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
270270
):
271271
return
272272

273-
self.report.set_ingestion_stage("*", QUERIES_EXTRACTION)
274-
275-
with BigQueryQueriesExtractor(
276-
connection=self.config.get_bigquery_client(),
277-
schema_api=self.bq_schema_extractor.schema_api,
278-
config=BigQueryQueriesExtractorConfig(
279-
window=self.config,
280-
user_email_pattern=self.config.usage.user_email_pattern,
281-
include_lineage=self.config.include_table_lineage,
282-
include_usage_statistics=self.config.include_usage_statistics,
283-
include_operations=self.config.usage.include_operational_stats,
284-
include_queries=self.config.include_queries,
285-
include_query_usage_statistics=self.config.include_query_usage_statistics,
286-
top_n_queries=self.config.usage.top_n_queries,
287-
region_qualifiers=self.config.region_qualifiers,
288-
),
289-
structured_report=self.report,
290-
filters=self.filters,
291-
identifiers=self.identifiers,
292-
schema_resolver=self.sql_parser_schema_resolver,
293-
discovered_tables=self.bq_schema_extractor.table_refs,
294-
) as queries_extractor:
295-
self.report.queries_extractor = queries_extractor.report
296-
yield from queries_extractor.get_workunits_internal()
297-
273+
with self.report.new_stage(f"*: {QUERIES_EXTRACTION}"):
274+
with BigQueryQueriesExtractor(
275+
connection=self.config.get_bigquery_client(),
276+
schema_api=self.bq_schema_extractor.schema_api,
277+
config=BigQueryQueriesExtractorConfig(
278+
window=self.config,
279+
user_email_pattern=self.config.usage.user_email_pattern,
280+
include_lineage=self.config.include_table_lineage,
281+
include_usage_statistics=self.config.include_usage_statistics,
282+
include_operations=self.config.usage.include_operational_stats,
283+
include_queries=self.config.include_queries,
284+
include_query_usage_statistics=self.config.include_query_usage_statistics,
285+
top_n_queries=self.config.usage.top_n_queries,
286+
region_qualifiers=self.config.region_qualifiers,
287+
),
288+
structured_report=self.report,
289+
filters=self.filters,
290+
identifiers=self.identifiers,
291+
schema_resolver=self.sql_parser_schema_resolver,
292+
discovered_tables=self.bq_schema_extractor.table_refs,
293+
) as queries_extractor:
294+
self.report.queries_extractor = queries_extractor.report
295+
yield from queries_extractor.get_workunits_internal()
298296
else:
299297
if self.config.include_usage_statistics:
300298
yield from self.usage_extractor.get_usage_workunits(

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py

-3
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,3 @@ class BigQueryV2Report(
190190
num_skipped_external_table_lineage: int = 0
191191

192192
queries_extractor: Optional[BigQueryQueriesExtractorReport] = None
193-
194-
def set_ingestion_stage(self, project_id: str, stage: str) -> None:
195-
self.report_ingestion_stage_start(f"{project_id}: {stage}")

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py

+11-11
Original file line numberDiff line numberDiff line change
@@ -248,9 +248,9 @@ def modified_base32decode(self, text_to_decode: str) -> str:
248248
def get_project_workunits(
249249
self, project: BigqueryProject
250250
) -> Iterable[MetadataWorkUnit]:
251-
self.report.set_ingestion_stage(project.id, METADATA_EXTRACTION)
252-
logger.info(f"Processing project: {project.id}")
253-
yield from self._process_project(project)
251+
with self.report.new_stage(f"{project.id}: {METADATA_EXTRACTION}"):
252+
logger.info(f"Processing project: {project.id}")
253+
yield from self._process_project(project)
254254

255255
def get_dataplatform_instance_aspect(
256256
self, dataset_urn: str, project_id: str
@@ -405,11 +405,11 @@ def _process_project(
405405

406406
if self.config.is_profiling_enabled():
407407
logger.info(f"Starting profiling project {project_id}")
408-
self.report.set_ingestion_stage(project_id, PROFILING)
409-
yield from self.profiler.get_workunits(
410-
project_id=project_id,
411-
tables=db_tables,
412-
)
408+
with self.report.new_stage(f"{project_id}: {PROFILING}"):
409+
yield from self.profiler.get_workunits(
410+
project_id=project_id,
411+
tables=db_tables,
412+
)
413413

414414
def _process_project_datasets(
415415
self,
@@ -1203,9 +1203,9 @@ def get_tables_for_dataset(
12031203
report=self.report,
12041204
)
12051205

1206-
self.report.metadata_extraction_sec[f"{project_id}.{dataset.name}"] = round(
1207-
timer.elapsed_seconds(), 2
1208-
)
1206+
self.report.metadata_extraction_sec[
1207+
f"{project_id}.{dataset.name}"
1208+
] = timer.elapsed_seconds(digits=2)
12091209

12101210
def get_core_table_details(
12111211
self, dataset_name: str, project_id: str, temp_table_dataset_prefix: str

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py

+7-7
Original file line numberDiff line numberDiff line change
@@ -330,11 +330,11 @@ def get_lineage_workunits(
330330
projects = ["*"] # project_id not used when using exported metadata
331331

332332
for project in projects:
333-
self.report.set_ingestion_stage(project, LINEAGE_EXTRACTION)
334-
yield from self.generate_lineage(
335-
project,
336-
table_refs,
337-
)
333+
with self.report.new_stage(f"{project}: {LINEAGE_EXTRACTION}"):
334+
yield from self.generate_lineage(
335+
project,
336+
table_refs,
337+
)
338338

339339
if self.redundant_run_skip_handler:
340340
# Update the checkpoint state for this run.
@@ -368,8 +368,8 @@ def generate_lineage(
368368
self.report.lineage_metadata_entries[project_id] = len(lineage)
369369
logger.info(f"Built lineage map containing {len(lineage)} entries.")
370370
logger.debug(f"lineage metadata is {lineage}")
371-
self.report.lineage_extraction_sec[project_id] = round(
372-
timer.elapsed_seconds(), 2
371+
self.report.lineage_extraction_sec[project_id] = timer.elapsed_seconds(
372+
digits=2
373373
)
374374
self.report.lineage_mem_size[project_id] = humanfriendly.format_size(
375375
memory_footprint.total_size(lineage)

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py

+57-57
Original file line numberDiff line numberDiff line change
@@ -495,62 +495,62 @@ def _ingest_events(
495495
def _generate_operational_workunits(
496496
self, usage_state: BigQueryUsageState, table_refs: Collection[str]
497497
) -> Iterable[MetadataWorkUnit]:
498-
self.report.set_ingestion_stage("*", USAGE_EXTRACTION_OPERATIONAL_STATS)
499-
for audit_event in usage_state.standalone_events():
500-
try:
501-
operational_wu = self._create_operation_workunit(
502-
audit_event, table_refs
503-
)
504-
if operational_wu:
505-
yield operational_wu
506-
self.report.num_operational_stats_workunits_emitted += 1
507-
except Exception as e:
508-
self.report.warning(
509-
message="Unable to generate operation workunit",
510-
context=f"{audit_event}",
511-
exc=e,
512-
)
498+
with self.report.new_stage(f"*: {USAGE_EXTRACTION_OPERATIONAL_STATS}"):
499+
for audit_event in usage_state.standalone_events():
500+
try:
501+
operational_wu = self._create_operation_workunit(
502+
audit_event, table_refs
503+
)
504+
if operational_wu:
505+
yield operational_wu
506+
self.report.num_operational_stats_workunits_emitted += 1
507+
except Exception as e:
508+
self.report.warning(
509+
message="Unable to generate operation workunit",
510+
context=f"{audit_event}",
511+
exc=e,
512+
)
513513

514514
def _generate_usage_workunits(
515515
self, usage_state: BigQueryUsageState
516516
) -> Iterable[MetadataWorkUnit]:
517-
self.report.set_ingestion_stage("*", USAGE_EXTRACTION_USAGE_AGGREGATION)
518-
top_n = (
519-
self.config.usage.top_n_queries
520-
if self.config.usage.include_top_n_queries
521-
else 0
522-
)
523-
for entry in usage_state.usage_statistics(top_n=top_n):
524-
try:
525-
query_freq = [
526-
(
527-
self.uuid_to_query.get(
528-
query_hash, usage_state.queries[query_hash]
529-
),
530-
count,
517+
with self.report.new_stage(f"*: {USAGE_EXTRACTION_USAGE_AGGREGATION}"):
518+
top_n = (
519+
self.config.usage.top_n_queries
520+
if self.config.usage.include_top_n_queries
521+
else 0
522+
)
523+
for entry in usage_state.usage_statistics(top_n=top_n):
524+
try:
525+
query_freq = [
526+
(
527+
self.uuid_to_query.get(
528+
query_hash, usage_state.queries[query_hash]
529+
),
530+
count,
531+
)
532+
for query_hash, count in entry.query_freq
533+
]
534+
yield make_usage_workunit(
535+
bucket_start_time=datetime.fromisoformat(entry.timestamp),
536+
resource=BigQueryTableRef.from_string_name(entry.resource),
537+
query_count=entry.query_count,
538+
query_freq=query_freq,
539+
user_freq=entry.user_freq,
540+
column_freq=entry.column_freq,
541+
bucket_duration=self.config.bucket_duration,
542+
resource_urn_builder=self.identifiers.gen_dataset_urn_from_raw_ref,
543+
top_n_queries=self.config.usage.top_n_queries,
544+
format_sql_queries=self.config.usage.format_sql_queries,
545+
queries_character_limit=self.config.usage.queries_character_limit,
546+
)
547+
self.report.num_usage_workunits_emitted += 1
548+
except Exception as e:
549+
self.report.warning(
550+
message="Unable to generate usage statistics workunit",
551+
context=f"{entry.timestamp}, {entry.resource}",
552+
exc=e,
531553
)
532-
for query_hash, count in entry.query_freq
533-
]
534-
yield make_usage_workunit(
535-
bucket_start_time=datetime.fromisoformat(entry.timestamp),
536-
resource=BigQueryTableRef.from_string_name(entry.resource),
537-
query_count=entry.query_count,
538-
query_freq=query_freq,
539-
user_freq=entry.user_freq,
540-
column_freq=entry.column_freq,
541-
bucket_duration=self.config.bucket_duration,
542-
resource_urn_builder=self.identifiers.gen_dataset_urn_from_raw_ref,
543-
top_n_queries=self.config.usage.top_n_queries,
544-
format_sql_queries=self.config.usage.format_sql_queries,
545-
queries_character_limit=self.config.usage.queries_character_limit,
546-
)
547-
self.report.num_usage_workunits_emitted += 1
548-
except Exception as e:
549-
self.report.warning(
550-
message="Unable to generate usage statistics workunit",
551-
context=f"{entry.timestamp}, {entry.resource}",
552-
exc=e,
553-
)
554554

555555
def _get_usage_events(self, projects: Iterable[str]) -> Iterable[AuditEvent]:
556556
if self.config.use_exported_bigquery_audit_metadata:
@@ -559,10 +559,10 @@ def _get_usage_events(self, projects: Iterable[str]) -> Iterable[AuditEvent]:
559559
for project_id in projects:
560560
with PerfTimer() as timer:
561561
try:
562-
self.report.set_ingestion_stage(
563-
project_id, USAGE_EXTRACTION_INGESTION
564-
)
565-
yield from self._get_parsed_bigquery_log_events(project_id)
562+
with self.report.new_stage(
563+
f"{project_id}: {USAGE_EXTRACTION_INGESTION}"
564+
):
565+
yield from self._get_parsed_bigquery_log_events(project_id)
566566
except Exception as e:
567567
self.report.usage_failed_extraction.append(project_id)
568568
self.report.warning(
@@ -572,8 +572,8 @@ def _get_usage_events(self, projects: Iterable[str]) -> Iterable[AuditEvent]:
572572
)
573573
self.report_status(f"usage-extraction-{project_id}", False)
574574

575-
self.report.usage_extraction_sec[project_id] = round(
576-
timer.elapsed_seconds(), 2
575+
self.report.usage_extraction_sec[project_id] = timer.elapsed_seconds(
576+
digits=2
577577
)
578578

579579
def _store_usage_event(

metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_profiling.py

+24-24
Original file line numberDiff line numberDiff line change
@@ -70,30 +70,30 @@ def get_workunits(
7070
) -> Iterable[MetadataWorkUnit]:
7171
for keyspace_name in cassandra_data.keyspaces:
7272
tables = cassandra_data.tables.get(keyspace_name, [])
73-
self.report.set_ingestion_stage(keyspace_name, PROFILING)
74-
with ThreadPoolExecutor(
75-
max_workers=self.config.profiling.max_workers
76-
) as executor:
77-
future_to_dataset = {
78-
executor.submit(
79-
self.generate_profile,
80-
keyspace_name,
81-
table_name,
82-
cassandra_data.columns.get(table_name, []),
83-
): table_name
84-
for table_name in tables
85-
}
86-
for future in as_completed(future_to_dataset):
87-
table_name = future_to_dataset[future]
88-
try:
89-
yield from future.result()
90-
except Exception as exc:
91-
self.report.profiling_skipped_other[table_name] += 1
92-
self.report.failure(
93-
message="Failed to profile for table",
94-
context=f"{keyspace_name}.{table_name}",
95-
exc=exc,
96-
)
73+
with self.report.new_stage(f"{keyspace_name}: {PROFILING}"):
74+
with ThreadPoolExecutor(
75+
max_workers=self.config.profiling.max_workers
76+
) as executor:
77+
future_to_dataset = {
78+
executor.submit(
79+
self.generate_profile,
80+
keyspace_name,
81+
table_name,
82+
cassandra_data.columns.get(table_name, []),
83+
): table_name
84+
for table_name in tables
85+
}
86+
for future in as_completed(future_to_dataset):
87+
table_name = future_to_dataset[future]
88+
try:
89+
yield from future.result()
90+
except Exception as exc:
91+
self.report.profiling_skipped_other[table_name] += 1
92+
self.report.failure(
93+
message="Failed to profile for table",
94+
context=f"{keyspace_name}.{table_name}",
95+
exc=exc,
96+
)
9797

9898
def generate_profile(
9999
self,

metadata-ingestion/src/datahub/ingestion/source/cassandra/cassandra_utils.py

-3
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,6 @@ def report_entity_scanned(self, name: str, ent_type: str = "View") -> None:
5454
else:
5555
raise KeyError(f"Unknown entity {ent_type}.")
5656

57-
def set_ingestion_stage(self, keyspace: str, stage: str) -> None:
58-
self.report_ingestion_stage_start(f"{keyspace}: {stage}")
59-
6057
# TODO Need to create seperate common config for profiling report
6158
profiling_skipped_other: TopKDict[str, int] = field(default_factory=int_top_k_dict)
6259
profiling_skipped_table_profile_pattern: TopKDict[str, int] = field(

metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_reporting.py

-3
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,3 @@ def report_entity_scanned(self, name: str, ent_type: str = "View") -> None:
4545
self.views_scanned += 1
4646
else:
4747
raise KeyError(f"Unknown entity {ent_type}.")
48-
49-
def set_ingestion_stage(self, dataset: str, stage: str) -> None:
50-
self.report_ingestion_stage_start(f"{dataset}: {stage}")

metadata-ingestion/src/datahub/ingestion/source/dremio/dremio_source.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -472,8 +472,8 @@ def generate_profiles(
472472
env=self.config.env,
473473
platform_instance=self.config.platform_instance,
474474
)
475-
self.report.set_ingestion_stage(dataset_info.resource_name, PROFILING)
476-
yield from self.profiler.get_workunits(dataset_info, dataset_urn)
475+
with self.report.new_stage(f"{dataset_info.resource_name}: {PROFILING}"):
476+
yield from self.profiler.get_workunits(dataset_info, dataset_urn)
477477

478478
def generate_view_lineage(
479479
self, dataset_urn: str, parents: List[str]

0 commit comments

Comments
 (0)