From 15a212f97b7b30ade2b07a93784c7e9ed4cd8365 Mon Sep 17 00:00:00 2001 From: Andrew Brain Date: Mon, 20 Jan 2025 21:02:15 -0600 Subject: [PATCH] speed up events collection --- augur/tasks/github/events.py | 54 ++++++++++++++++++++++++++---------- augur/tasks/start_tasks.py | 2 +- 2 files changed, 40 insertions(+), 16 deletions(-) diff --git a/augur/tasks/github/events.py b/augur/tasks/github/events.py index 481fc0a42..ff3dfec9b 100644 --- a/augur/tasks/github/events.py +++ b/augur/tasks/github/events.py @@ -3,22 +3,24 @@ import sqlalchemy as s from sqlalchemy.sql import text from abc import ABC, abstractmethod +from datetime import datetime, timedelta, timezone from augur.tasks.init.celery_app import celery_app as celery from augur.tasks.init.celery_app import AugurCoreRepoCollectionTask from augur.application.db.data_parse import * from augur.tasks.github.util.github_data_access import GithubDataAccess, UrlNotFoundException from augur.tasks.github.util.github_random_key_auth import GithubRandomKeyAuth +from augur.tasks.github.util.github_task_session import GithubTaskManifest from augur.tasks.github.util.util import get_owner_repo from augur.tasks.util.worker_util import remove_duplicate_dicts -from augur.application.db.models import PullRequestEvent, IssueEvent, Contributor, CollectionStatus -from augur.application.db.lib import get_repo_by_repo_git, bulk_insert_dicts, get_issues_by_repo_id, get_pull_requests_by_repo_id, update_issue_closed_cntrbs_by_repo_id, get_session, get_engine +from augur.application.db.models import PullRequestEvent, IssueEvent, Contributor, Repo +from augur.application.db.lib import get_repo_by_repo_git, bulk_insert_dicts, get_issues_by_repo_id, get_pull_requests_by_repo_id, update_issue_closed_cntrbs_by_repo_id, get_session, get_engine, get_core_data_last_collected platform_id = 1 @celery.task(base=AugurCoreRepoCollectionTask) -def collect_events(repo_git: str): +def collect_events(repo_git: str, full_collection: bool): logger = logging.getLogger(collect_events.__name__) @@ -26,6 +28,14 @@ def collect_events(repo_git: str): logger.debug(f"Collecting Github events for {owner}/{repo}") + if full_collection: + core_data_last_collected = None + else: + repo_id = get_repo_by_repo_git().repo_id + + # subtract 2 days to ensure all data is collected + core_data_last_collected = (get_core_data_last_collected(repo_id) - timedelta(days=2)).replace(tzinfo=timezone.utc) + key_auth = GithubRandomKeyAuth(logger) if bulk_events_collection_endpoint_contains_all_data(key_auth, logger, owner, repo): @@ -60,7 +70,7 @@ def __init__(self, logger): self._data_source = "Github API" @abstractmethod - def collect(self, repo_git, key_auth): + def collect(self, repo_git, key_auth, since): pass def _insert_issue_events(self, events): @@ -97,7 +107,7 @@ def __init__(self, logger): super().__init__(logger) - def collect(self, repo_git, key_auth): + def collect(self, repo_git, key_auth, since): repo_obj = get_repo_by_repo_git(repo_git) repo_id = repo_obj.repo_id @@ -106,7 +116,7 @@ def collect(self, repo_git, key_auth): self.repo_identifier = f"{owner}/{repo}" events = [] - for event in self._collect_events(repo_git, key_auth): + for event in self._collect_events(repo_git, key_auth, since): events.append(event) # making this a decent size since process_events retrieves all the issues and prs each time @@ -117,7 +127,7 @@ def collect(self, repo_git, key_auth): if events: self._process_events(events, repo_id) - def _collect_events(self, repo_git: str, key_auth): + def _collect_events(self, repo_git: str, key_auth, since): owner, repo = get_owner_repo(repo_git) @@ -125,7 +135,13 @@ def _collect_events(self, repo_git: str, key_auth): github_data_access = GithubDataAccess(key_auth, self._logger) - return github_data_access.paginate_resource(url) + for event in github_data_access.paginate_resource(url): + + yield event + + # return if last event on the page was updated before the since date + if since and datetime.fromisoformat(event["created_at"].replace("Z", "+00:00")).replace(tzinfo=timezone.utc) < since: + return def _process_events(self, events, repo_id): @@ -248,7 +264,7 @@ class ThoroughGithubEventCollection(GithubEventCollection): def __init__(self, logger): super().__init__(logger) - def collect(self, repo_git, key_auth): + def collect(self, repo_git, key_auth, since): repo_obj = get_repo_by_repo_git(repo_git) repo_id = repo_obj.repo_id @@ -256,10 +272,10 @@ def collect(self, repo_git, key_auth): owner, repo = get_owner_repo(repo_git) self.repo_identifier = f"{owner}/{repo}" - self._collect_and_process_issue_events(owner, repo, repo_id, key_auth) - self._collect_and_process_pr_events(owner, repo, repo_id, key_auth) + self._collect_and_process_issue_events(owner, repo, repo_id, key_auth, since) + self._collect_and_process_pr_events(owner, repo, repo_id, key_auth, since) - def _collect_and_process_issue_events(self, owner, repo, repo_id, key_auth): + def _collect_and_process_issue_events(self, owner, repo, repo_id, key_auth, since): engine = get_engine() @@ -267,7 +283,11 @@ def _collect_and_process_issue_events(self, owner, repo, repo_id, key_auth): # TODO: Remove src id if it ends up not being needed query = text(f""" - select issue_id as issue_id, gh_issue_number as issue_number, gh_issue_id as gh_src_id from issues WHERE repo_id={repo_id} order by created_at desc; + select issue_id as issue_id, gh_issue_number as issue_number, gh_issue_id as gh_src_id + from issues + where repo_id={repo_id} + and updated_at > timestamptz(timestamp '{since}') + order by created_at desc; """) issue_result = connection.execute(query).fetchall() @@ -309,14 +329,18 @@ def _collect_and_process_issue_events(self, owner, repo, repo_id, key_auth): events.clear() - def _collect_and_process_pr_events(self, owner, repo, repo_id, key_auth): + def _collect_and_process_pr_events(self, owner, repo, repo_id, key_auth, since): engine = get_engine() with engine.connect() as connection: query = text(f""" - select pull_request_id, pr_src_number as gh_pr_number, pr_src_id from pull_requests WHERE repo_id={repo_id} order by pr_created_at desc; + select pull_request_id, pr_src_number as gh_pr_number, pr_src_id + from pull_requests + where repo_id={repo_id} + and pr_updated_at > timestamptz(timestamp '{since}') + order by pr_created_at desc; """) pr_result = connection.execute(query).fetchall() diff --git a/augur/tasks/start_tasks.py b/augur/tasks/start_tasks.py index 8aa767ece..ae0c6c299 100644 --- a/augur/tasks/start_tasks.py +++ b/augur/tasks/start_tasks.py @@ -73,7 +73,7 @@ def primary_repo_collect_phase(repo_git, full_collection): #Define secondary group that can't run until after primary jobs have finished. secondary_repo_jobs = group( - collect_events.si(repo_git),#*create_grouped_task_load(dataList=first_pass, task=collect_events).tasks, + collect_events.si(repo_git, full_collection),#*create_grouped_task_load(dataList=first_pass, task=collect_events).tasks, collect_github_messages.si(repo_git, full_collection), #*create_grouped_task_load(dataList=first_pass,task=collect_github_messages).tasks, collect_github_repo_clones_data.si(repo_git), )