From 1b3e0d403f1d911d457d207781f0b6b583797a43 Mon Sep 17 00:00:00 2001 From: PredictiveManish Date: Sat, 6 Dec 2025 22:55:06 +0530 Subject: [PATCH 1/3] Fix: Eliminate redundant full table scans in messages and events collection Signed-off-by: PredictiveManish --- augur/tasks/github/events.py | 24 ++++++++++----------- augur/tasks/github/messages.py | 39 +++++++++++++++++----------------- 2 files changed, 31 insertions(+), 32 deletions(-) diff --git a/augur/tasks/github/events.py b/augur/tasks/github/events.py index 38a5e9e9c6..0de82a3648 100644 --- a/augur/tasks/github/events.py +++ b/augur/tasks/github/events.py @@ -115,17 +115,20 @@ def collect(self, repo_git, key_auth, since): owner, repo = get_owner_repo(repo_git) self.repo_identifier = f"{owner}/{repo}" + # Build mappings once before processing any events + issue_url_to_id_map = self._get_map_from_issue_url_to_id(repo_id) + pr_url_to_id_map = self._get_map_from_pr_url_to_id(repo_id) + events = [] for event in self._collect_events(repo_git, key_auth, since): events.append(event) - # making this a decent size since process_events retrieves all the issues and prs each time if len(events) >= 500: - self._process_events(events, repo_id) + self._process_events(events, repo_id, issue_url_to_id_map, pr_url_to_id_map) events.clear() if events: - self._process_events(events, repo_id) + self._process_events(events, repo_id, issue_url_to_id_map, pr_url_to_id_map) def _collect_events(self, repo_git: str, key_auth, since): @@ -143,7 +146,7 @@ def _collect_events(self, repo_git: str, key_auth, since): if since and datetime.fromisoformat(event["created_at"].replace("Z", "+00:00")).replace(tzinfo=timezone.utc) < since: return - def _process_events(self, events, repo_id): + def _process_events(self, events, repo_id, issue_url_to_id_map, pr_url_to_id_map): issue_events = [] pr_events = [] @@ -161,19 +164,16 @@ def _process_events(self, events, repo_id): if not_mappable_events: self._logger.warning(f"{self.repo_identifier} - {self.task_name}: Unable to map these github events to an issue or pr: {not_mappable_events}") - self._process_issue_events(issue_events, repo_id) - self._process_pr_events(pr_events, repo_id) + self._process_issue_events(issue_events, repo_id, issue_url_to_id_map) + self._process_pr_events(pr_events, repo_id, pr_url_to_id_map) update_issue_closed_cntrbs_by_repo_id(repo_id) - def _process_issue_events(self, issue_events, repo_id): + def _process_issue_events(self, issue_events, repo_id, issue_url_to_id_map): issue_event_dicts = [] contributors = [] - - issue_url_to_id_map = self._get_map_from_issue_url_to_id(repo_id) - for event in issue_events: event, contributor = self._process_github_event_contributors(event) @@ -200,13 +200,11 @@ def _process_issue_events(self, issue_events, repo_id): self._insert_issue_events(issue_event_dicts) - def _process_pr_events(self, pr_events, repo_id): + def _process_pr_events(self, pr_events, repo_id, pr_url_to_id_map): pr_event_dicts = [] contributors = [] - pr_url_to_id_map = self._get_map_from_pr_url_to_id(repo_id) - for event in pr_events: event, contributor = self._process_github_event_contributors(event) diff --git a/augur/tasks/github/messages.py b/augur/tasks/github/messages.py index 6a888be5e7..8cfa4dbe05 100644 --- a/augur/tasks/github/messages.py +++ b/augur/tasks/github/messages.py @@ -37,17 +37,30 @@ def collect_github_messages(repo_git: str, full_collection: bool) -> None: core_data_last_collected = (get_core_data_last_collected(repo_id) - timedelta(days=2)).replace(tzinfo=timezone.utc) + # Build mappings once before processing any messages + # create mapping from issue url to issue id of current issues + issue_url_to_id_map = {} + issues = augur_db.session.query(Issue).filter(Issue.repo_id == repo_id).all() + for issue in issues: + issue_url_to_id_map[issue.issue_url] = issue.issue_id + + # create mapping from pr url to pr id of current pull requests + pr_issue_url_to_id_map = {} + prs = augur_db.session.query(PullRequest).filter(PullRequest.repo_id == repo_id).all() + for pr in prs: + pr_issue_url_to_id_map[pr.pr_issue_url] = pr.pull_request_id + if is_repo_small(repo_id): message_data = fast_retrieve_all_pr_and_issue_messages(repo_git, logger, manifest.key_auth, task_name, core_data_last_collected) if message_data: - process_messages(message_data, task_name, repo_id, logger, augur_db) + process_messages(message_data, task_name, repo_id, logger, augur_db, issue_url_to_id_map, pr_issue_url_to_id_map) else: logger.info(f"{owner}/{repo} has no messages") else: - process_large_issue_and_pr_message_collection(repo_id, repo_git, logger, manifest.key_auth, task_name, augur_db, core_data_last_collected) + process_large_issue_and_pr_message_collection(repo_id, repo_git, logger, manifest.key_auth, task_name, augur_db, core_data_last_collected, issue_url_to_id_map, pr_issue_url_to_id_map) def is_repo_small(repo_id): @@ -80,7 +93,7 @@ def fast_retrieve_all_pr_and_issue_messages(repo_git: str, logger, key_auth, tas return list(github_data_access.paginate_resource(url)) -def process_large_issue_and_pr_message_collection(repo_id, repo_git: str, logger, key_auth, task_name, augur_db, since) -> None: +def process_large_issue_and_pr_message_collection(repo_id, repo_git: str, logger, key_auth, task_name, augur_db, since, issue_url_to_id_map, pr_issue_url_to_id_map) -> None: owner, repo = get_owner_repo(repo_git) @@ -124,17 +137,17 @@ def process_large_issue_and_pr_message_collection(repo_id, repo_git: str, logger logger.info(f"{task_name}: PR or issue comment url of {comment_url} returned 404. Skipping.") skipped_urls += 1 - if len(all_data) >= 20: - process_messages(all_data, task_name, repo_id, logger, augur_db) + if len(all_data) >= 1000: + process_messages(all_data, task_name, repo_id, logger, augur_db, issue_url_to_id_map, pr_issue_url_to_id_map) all_data.clear() if len(all_data) > 0: - process_messages(all_data, task_name, repo_id, logger, augur_db) + process_messages(all_data, task_name, repo_id, logger, augur_db, issue_url_to_id_map, pr_issue_url_to_id_map) logger.info(f"{task_name}: Finished. Skipped {skipped_urls} comment URLs due to 404.") -def process_messages(messages, task_name, repo_id, logger, augur_db): +def process_messages(messages, task_name, repo_id, logger, augur_db, issue_url_to_id_map, pr_issue_url_to_id_map): tool_source = "Pr comment task" tool_version = "2.0" @@ -151,18 +164,6 @@ def process_messages(messages, task_name, repo_id, logger, augur_db): if len(messages) == 0: logger.info(f"{task_name}: No messages to process") - # create mapping from issue url to issue id of current issues - issue_url_to_id_map = {} - issues = augur_db.session.query(Issue).filter(Issue.repo_id == repo_id).all() - for issue in issues: - issue_url_to_id_map[issue.issue_url] = issue.issue_id - - # create mapping from pr url to pr id of current pull requests - pr_issue_url_to_id_map = {} - prs = augur_db.session.query(PullRequest).filter(PullRequest.repo_id == repo_id).all() - for pr in prs: - pr_issue_url_to_id_map[pr.pr_issue_url] = pr.pull_request_id - message_len = len(messages) for index, message in enumerate(messages): From 9824374d21c086ce8571951ab752abd052b55544 Mon Sep 17 00:00:00 2001 From: PredictiveManish Date: Sun, 7 Dec 2025 23:25:50 +0530 Subject: [PATCH 2/3] Reduced batch overhead Signed-off-by: PredictiveManish --- augur/tasks/github/messages.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/augur/tasks/github/messages.py b/augur/tasks/github/messages.py index 8cfa4dbe05..3e1ce831f9 100644 --- a/augur/tasks/github/messages.py +++ b/augur/tasks/github/messages.py @@ -137,7 +137,7 @@ def process_large_issue_and_pr_message_collection(repo_id, repo_git: str, logger logger.info(f"{task_name}: PR or issue comment url of {comment_url} returned 404. Skipping.") skipped_urls += 1 - if len(all_data) >= 1000: + if len(all_data) >= 200: process_messages(all_data, task_name, repo_id, logger, augur_db, issue_url_to_id_map, pr_issue_url_to_id_map) all_data.clear() From ac28a7533f40bc5c73eaf78c03a00646593e7f48 Mon Sep 17 00:00:00 2001 From: PredictiveManish Date: Mon, 8 Dec 2025 21:36:14 +0530 Subject: [PATCH 3/3] Removed magic number with constant Signed-off-by: PredictiveManish --- augur/tasks/github/messages.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/augur/tasks/github/messages.py b/augur/tasks/github/messages.py index 3e1ce831f9..fe4fa777f6 100644 --- a/augur/tasks/github/messages.py +++ b/augur/tasks/github/messages.py @@ -14,7 +14,7 @@ from sqlalchemy.sql import text platform_id = 1 - +MESSAGE_BATCH_SIZE = 200 @celery.task(base=AugurCoreRepoCollectionTask) def collect_github_messages(repo_git: str, full_collection: bool) -> None: @@ -137,7 +137,7 @@ def process_large_issue_and_pr_message_collection(repo_id, repo_git: str, logger logger.info(f"{task_name}: PR or issue comment url of {comment_url} returned 404. Skipping.") skipped_urls += 1 - if len(all_data) >= 200: + if len(all_data) >= MESSAGE_BATCH_SIZE: process_messages(all_data, task_name, repo_id, logger, augur_db, issue_url_to_id_map, pr_issue_url_to_id_map) all_data.clear()