Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(tableau): set ingestion stage report and perftimers #12234

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,8 @@ def as_obj(self) -> dict:
}

def compute_stats(self) -> None:
super().compute_stats()

duration = datetime.datetime.now() - self.start_time
workunits_produced = self.events_produced
if duration.total_seconds() > 0:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,14 +253,14 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
for project in projects:
yield from self.bq_schema_extractor.get_project_workunits(project)

self.report.set_ingestion_stage("*", "View and Snapshot Lineage")
yield from self.lineage_extractor.get_lineage_workunits_for_views_and_snapshots(
[p.id for p in projects],
self.bq_schema_extractor.view_refs_by_project,
self.bq_schema_extractor.view_definitions,
self.bq_schema_extractor.snapshot_refs_by_project,
self.bq_schema_extractor.snapshots_by_ref,
)
with self.report.new_stage("*: View and Snapshot Lineage"):
yield from self.lineage_extractor.get_lineage_workunits_for_views_and_snapshots(
[p.id for p in projects],
self.bq_schema_extractor.view_refs_by_project,
self.bq_schema_extractor.view_definitions,
self.bq_schema_extractor.snapshot_refs_by_project,
self.bq_schema_extractor.snapshots_by_ref,
)

if self.config.use_queries_v2:
# if both usage and lineage are disabled then skip queries extractor piece
Expand All @@ -270,31 +270,29 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
):
return

self.report.set_ingestion_stage("*", QUERIES_EXTRACTION)

with BigQueryQueriesExtractor(
connection=self.config.get_bigquery_client(),
schema_api=self.bq_schema_extractor.schema_api,
config=BigQueryQueriesExtractorConfig(
window=self.config,
user_email_pattern=self.config.usage.user_email_pattern,
include_lineage=self.config.include_table_lineage,
include_usage_statistics=self.config.include_usage_statistics,
include_operations=self.config.usage.include_operational_stats,
include_queries=self.config.include_queries,
include_query_usage_statistics=self.config.include_query_usage_statistics,
top_n_queries=self.config.usage.top_n_queries,
region_qualifiers=self.config.region_qualifiers,
),
structured_report=self.report,
filters=self.filters,
identifiers=self.identifiers,
schema_resolver=self.sql_parser_schema_resolver,
discovered_tables=self.bq_schema_extractor.table_refs,
) as queries_extractor:
self.report.queries_extractor = queries_extractor.report
yield from queries_extractor.get_workunits_internal()

with self.report.new_stage(f"*: {QUERIES_EXTRACTION}"):
with BigQueryQueriesExtractor(
connection=self.config.get_bigquery_client(),
schema_api=self.bq_schema_extractor.schema_api,
config=BigQueryQueriesExtractorConfig(
window=self.config,
user_email_pattern=self.config.usage.user_email_pattern,
include_lineage=self.config.include_table_lineage,
include_usage_statistics=self.config.include_usage_statistics,
include_operations=self.config.usage.include_operational_stats,
include_queries=self.config.include_queries,
include_query_usage_statistics=self.config.include_query_usage_statistics,
top_n_queries=self.config.usage.top_n_queries,
region_qualifiers=self.config.region_qualifiers,
),
structured_report=self.report,
filters=self.filters,
identifiers=self.identifiers,
schema_resolver=self.sql_parser_schema_resolver,
discovered_tables=self.bq_schema_extractor.table_refs,
) as queries_extractor:
self.report.queries_extractor = queries_extractor.report
yield from queries_extractor.get_workunits_internal()
else:
if self.config.include_usage_statistics:
yield from self.usage_extractor.get_usage_workunits(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,3 @@ class BigQueryV2Report(
num_skipped_external_table_lineage: int = 0

queries_extractor: Optional[BigQueryQueriesExtractorReport] = None

def set_ingestion_stage(self, project_id: str, stage: str) -> None:
self.report_ingestion_stage_start(f"{project_id}: {stage}")
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,9 @@ def modified_base32decode(self, text_to_decode: str) -> str:
def get_project_workunits(
self, project: BigqueryProject
) -> Iterable[MetadataWorkUnit]:
self.report.set_ingestion_stage(project.id, METADATA_EXTRACTION)
logger.info(f"Processing project: {project.id}")
yield from self._process_project(project)
with self.report.new_stage(f"{project.id}: {METADATA_EXTRACTION}"):
logger.info(f"Processing project: {project.id}")
yield from self._process_project(project)

def get_dataplatform_instance_aspect(
self, dataset_urn: str, project_id: str
Expand Down Expand Up @@ -405,11 +405,11 @@ def _process_project(

if self.config.is_profiling_enabled():
logger.info(f"Starting profiling project {project_id}")
self.report.set_ingestion_stage(project_id, PROFILING)
yield from self.profiler.get_workunits(
project_id=project_id,
tables=db_tables,
)
with self.report.new_stage(f"{project_id}: {PROFILING}"):
yield from self.profiler.get_workunits(
project_id=project_id,
tables=db_tables,
)

def _process_project_datasets(
self,
Expand Down Expand Up @@ -1203,9 +1203,9 @@ def get_tables_for_dataset(
report=self.report,
)

self.report.metadata_extraction_sec[f"{project_id}.{dataset.name}"] = round(
timer.elapsed_seconds(), 2
)
self.report.metadata_extraction_sec[
f"{project_id}.{dataset.name}"
] = timer.elapsed_seconds(digits=2)

def get_core_table_details(
self, dataset_name: str, project_id: str, temp_table_dataset_prefix: str
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,11 +330,11 @@ def get_lineage_workunits(
projects = ["*"] # project_id not used when using exported metadata

for project in projects:
self.report.set_ingestion_stage(project, LINEAGE_EXTRACTION)
yield from self.generate_lineage(
project,
table_refs,
)
with self.report.new_stage(f"{project}: {LINEAGE_EXTRACTION}"):
yield from self.generate_lineage(
project,
table_refs,
)

if self.redundant_run_skip_handler:
# Update the checkpoint state for this run.
Expand Down Expand Up @@ -368,8 +368,8 @@ def generate_lineage(
self.report.lineage_metadata_entries[project_id] = len(lineage)
logger.info(f"Built lineage map containing {len(lineage)} entries.")
logger.debug(f"lineage metadata is {lineage}")
self.report.lineage_extraction_sec[project_id] = round(
timer.elapsed_seconds(), 2
self.report.lineage_extraction_sec[project_id] = timer.elapsed_seconds(
digits=2
)
self.report.lineage_mem_size[project_id] = humanfriendly.format_size(
memory_footprint.total_size(lineage)
Expand Down
114 changes: 57 additions & 57 deletions metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,62 +495,62 @@ def _ingest_events(
def _generate_operational_workunits(
self, usage_state: BigQueryUsageState, table_refs: Collection[str]
) -> Iterable[MetadataWorkUnit]:
self.report.set_ingestion_stage("*", USAGE_EXTRACTION_OPERATIONAL_STATS)
for audit_event in usage_state.standalone_events():
try:
operational_wu = self._create_operation_workunit(
audit_event, table_refs
)
if operational_wu:
yield operational_wu
self.report.num_operational_stats_workunits_emitted += 1
except Exception as e:
self.report.warning(
message="Unable to generate operation workunit",
context=f"{audit_event}",
exc=e,
)
with self.report.new_stage(f"*: {USAGE_EXTRACTION_OPERATIONAL_STATS}"):
for audit_event in usage_state.standalone_events():
try:
operational_wu = self._create_operation_workunit(
audit_event, table_refs
)
if operational_wu:
yield operational_wu
self.report.num_operational_stats_workunits_emitted += 1
except Exception as e:
self.report.warning(
message="Unable to generate operation workunit",
context=f"{audit_event}",
exc=e,
)

def _generate_usage_workunits(
self, usage_state: BigQueryUsageState
) -> Iterable[MetadataWorkUnit]:
self.report.set_ingestion_stage("*", USAGE_EXTRACTION_USAGE_AGGREGATION)
top_n = (
self.config.usage.top_n_queries
if self.config.usage.include_top_n_queries
else 0
)
for entry in usage_state.usage_statistics(top_n=top_n):
try:
query_freq = [
(
self.uuid_to_query.get(
query_hash, usage_state.queries[query_hash]
),
count,
with self.report.new_stage(f"*: {USAGE_EXTRACTION_USAGE_AGGREGATION}"):
top_n = (
self.config.usage.top_n_queries
if self.config.usage.include_top_n_queries
else 0
)
for entry in usage_state.usage_statistics(top_n=top_n):
try:
query_freq = [
(
self.uuid_to_query.get(
query_hash, usage_state.queries[query_hash]
),
count,
)
for query_hash, count in entry.query_freq
]
yield make_usage_workunit(
bucket_start_time=datetime.fromisoformat(entry.timestamp),
resource=BigQueryTableRef.from_string_name(entry.resource),
query_count=entry.query_count,
query_freq=query_freq,
user_freq=entry.user_freq,
column_freq=entry.column_freq,
bucket_duration=self.config.bucket_duration,
resource_urn_builder=self.identifiers.gen_dataset_urn_from_raw_ref,
top_n_queries=self.config.usage.top_n_queries,
format_sql_queries=self.config.usage.format_sql_queries,
queries_character_limit=self.config.usage.queries_character_limit,
)
self.report.num_usage_workunits_emitted += 1
except Exception as e:
self.report.warning(
message="Unable to generate usage statistics workunit",
context=f"{entry.timestamp}, {entry.resource}",
exc=e,
)
for query_hash, count in entry.query_freq
]
yield make_usage_workunit(
bucket_start_time=datetime.fromisoformat(entry.timestamp),
resource=BigQueryTableRef.from_string_name(entry.resource),
query_count=entry.query_count,
query_freq=query_freq,
user_freq=entry.user_freq,
column_freq=entry.column_freq,
bucket_duration=self.config.bucket_duration,
resource_urn_builder=self.identifiers.gen_dataset_urn_from_raw_ref,
top_n_queries=self.config.usage.top_n_queries,
format_sql_queries=self.config.usage.format_sql_queries,
queries_character_limit=self.config.usage.queries_character_limit,
)
self.report.num_usage_workunits_emitted += 1
except Exception as e:
self.report.warning(
message="Unable to generate usage statistics workunit",
context=f"{entry.timestamp}, {entry.resource}",
exc=e,
)

def _get_usage_events(self, projects: Iterable[str]) -> Iterable[AuditEvent]:
if self.config.use_exported_bigquery_audit_metadata:
Expand All @@ -559,10 +559,10 @@ def _get_usage_events(self, projects: Iterable[str]) -> Iterable[AuditEvent]:
for project_id in projects:
with PerfTimer() as timer:
try:
self.report.set_ingestion_stage(
project_id, USAGE_EXTRACTION_INGESTION
)
yield from self._get_parsed_bigquery_log_events(project_id)
with self.report.new_stage(
f"{project_id}: {USAGE_EXTRACTION_INGESTION}"
):
yield from self._get_parsed_bigquery_log_events(project_id)
except Exception as e:
self.report.usage_failed_extraction.append(project_id)
self.report.warning(
Expand All @@ -572,8 +572,8 @@ def _get_usage_events(self, projects: Iterable[str]) -> Iterable[AuditEvent]:
)
self.report_status(f"usage-extraction-{project_id}", False)

self.report.usage_extraction_sec[project_id] = round(
timer.elapsed_seconds(), 2
self.report.usage_extraction_sec[project_id] = timer.elapsed_seconds(
digits=2
)

def _store_usage_event(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,30 +70,30 @@ def get_workunits(
) -> Iterable[MetadataWorkUnit]:
for keyspace_name in cassandra_data.keyspaces:
tables = cassandra_data.tables.get(keyspace_name, [])
self.report.set_ingestion_stage(keyspace_name, PROFILING)
with ThreadPoolExecutor(
max_workers=self.config.profiling.max_workers
) as executor:
future_to_dataset = {
executor.submit(
self.generate_profile,
keyspace_name,
table_name,
cassandra_data.columns.get(table_name, []),
): table_name
for table_name in tables
}
for future in as_completed(future_to_dataset):
table_name = future_to_dataset[future]
try:
yield from future.result()
except Exception as exc:
self.report.profiling_skipped_other[table_name] += 1
self.report.failure(
message="Failed to profile for table",
context=f"{keyspace_name}.{table_name}",
exc=exc,
)
with self.report.new_stage(f"{keyspace_name}: {PROFILING}"):
with ThreadPoolExecutor(
max_workers=self.config.profiling.max_workers
) as executor:
future_to_dataset = {
executor.submit(
self.generate_profile,
keyspace_name,
table_name,
cassandra_data.columns.get(table_name, []),
): table_name
for table_name in tables
}
for future in as_completed(future_to_dataset):
table_name = future_to_dataset[future]
try:
yield from future.result()
except Exception as exc:
self.report.profiling_skipped_other[table_name] += 1
self.report.failure(
message="Failed to profile for table",
context=f"{keyspace_name}.{table_name}",
exc=exc,
)

def generate_profile(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,6 @@ def report_entity_scanned(self, name: str, ent_type: str = "View") -> None:
else:
raise KeyError(f"Unknown entity {ent_type}.")

def set_ingestion_stage(self, keyspace: str, stage: str) -> None:
self.report_ingestion_stage_start(f"{keyspace}: {stage}")

# TODO Need to create seperate common config for profiling report
profiling_skipped_other: TopKDict[str, int] = field(default_factory=int_top_k_dict)
profiling_skipped_table_profile_pattern: TopKDict[str, int] = field(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,3 @@ def report_entity_scanned(self, name: str, ent_type: str = "View") -> None:
self.views_scanned += 1
else:
raise KeyError(f"Unknown entity {ent_type}.")

def set_ingestion_stage(self, dataset: str, stage: str) -> None:
self.report_ingestion_stage_start(f"{dataset}: {stage}")
Original file line number Diff line number Diff line change
Expand Up @@ -472,8 +472,8 @@ def generate_profiles(
env=self.config.env,
platform_instance=self.config.platform_instance,
)
self.report.set_ingestion_stage(dataset_info.resource_name, PROFILING)
yield from self.profiler.get_workunits(dataset_info, dataset_urn)
with self.report.new_stage(f"{dataset_info.resource_name}: {PROFILING}"):
yield from self.profiler.get_workunits(dataset_info, dataset_urn)

def generate_view_lineage(
self, dataset_urn: str, parents: List[str]
Expand Down
Loading
Loading