Skip to content

Commit

Permalink
speed up events collection
Browse files Browse the repository at this point in the history
  • Loading branch information
ABrain7710 committed Jan 21, 2025
1 parent 6cd0ddb commit 15a212f
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 16 deletions.
54 changes: 39 additions & 15 deletions augur/tasks/github/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,39 @@
import sqlalchemy as s
from sqlalchemy.sql import text
from abc import ABC, abstractmethod
from datetime import datetime, timedelta, timezone

from augur.tasks.init.celery_app import celery_app as celery
from augur.tasks.init.celery_app import AugurCoreRepoCollectionTask
from augur.application.db.data_parse import *
from augur.tasks.github.util.github_data_access import GithubDataAccess, UrlNotFoundException
from augur.tasks.github.util.github_random_key_auth import GithubRandomKeyAuth
from augur.tasks.github.util.github_task_session import GithubTaskManifest
from augur.tasks.github.util.util import get_owner_repo
from augur.tasks.util.worker_util import remove_duplicate_dicts
from augur.application.db.models import PullRequestEvent, IssueEvent, Contributor, CollectionStatus
from augur.application.db.lib import get_repo_by_repo_git, bulk_insert_dicts, get_issues_by_repo_id, get_pull_requests_by_repo_id, update_issue_closed_cntrbs_by_repo_id, get_session, get_engine
from augur.application.db.models import PullRequestEvent, IssueEvent, Contributor, Repo
from augur.application.db.lib import get_repo_by_repo_git, bulk_insert_dicts, get_issues_by_repo_id, get_pull_requests_by_repo_id, update_issue_closed_cntrbs_by_repo_id, get_session, get_engine, get_core_data_last_collected


platform_id = 1

@celery.task(base=AugurCoreRepoCollectionTask)
def collect_events(repo_git: str):
def collect_events(repo_git: str, full_collection: bool):

logger = logging.getLogger(collect_events.__name__)

owner, repo = get_owner_repo(repo_git)

logger.debug(f"Collecting Github events for {owner}/{repo}")

if full_collection:
core_data_last_collected = None
else:
repo_id = get_repo_by_repo_git().repo_id

# subtract 2 days to ensure all data is collected
core_data_last_collected = (get_core_data_last_collected(repo_id) - timedelta(days=2)).replace(tzinfo=timezone.utc)

key_auth = GithubRandomKeyAuth(logger)

if bulk_events_collection_endpoint_contains_all_data(key_auth, logger, owner, repo):
Expand Down Expand Up @@ -60,7 +70,7 @@ def __init__(self, logger):
self._data_source = "Github API"

@abstractmethod
def collect(self, repo_git, key_auth):
def collect(self, repo_git, key_auth, since):
pass

def _insert_issue_events(self, events):
Expand Down Expand Up @@ -97,7 +107,7 @@ def __init__(self, logger):

super().__init__(logger)

def collect(self, repo_git, key_auth):
def collect(self, repo_git, key_auth, since):

repo_obj = get_repo_by_repo_git(repo_git)
repo_id = repo_obj.repo_id
Expand All @@ -106,7 +116,7 @@ def collect(self, repo_git, key_auth):
self.repo_identifier = f"{owner}/{repo}"

events = []
for event in self._collect_events(repo_git, key_auth):
for event in self._collect_events(repo_git, key_auth, since):
events.append(event)

# making this a decent size since process_events retrieves all the issues and prs each time
Expand All @@ -117,15 +127,21 @@ def collect(self, repo_git, key_auth):
if events:
self._process_events(events, repo_id)

def _collect_events(self, repo_git: str, key_auth):
def _collect_events(self, repo_git: str, key_auth, since):

owner, repo = get_owner_repo(repo_git)

url = f"https://api.github.com/repos/{owner}/{repo}/issues/events"

github_data_access = GithubDataAccess(key_auth, self._logger)

return github_data_access.paginate_resource(url)
for event in github_data_access.paginate_resource(url):

yield event

# return if last event on the page was updated before the since date
if since and datetime.fromisoformat(event["created_at"].replace("Z", "+00:00")).replace(tzinfo=timezone.utc) < since:
return

def _process_events(self, events, repo_id):

Expand Down Expand Up @@ -248,26 +264,30 @@ class ThoroughGithubEventCollection(GithubEventCollection):
def __init__(self, logger):
super().__init__(logger)

def collect(self, repo_git, key_auth):
def collect(self, repo_git, key_auth, since):

repo_obj = get_repo_by_repo_git(repo_git)
repo_id = repo_obj.repo_id

owner, repo = get_owner_repo(repo_git)
self.repo_identifier = f"{owner}/{repo}"

self._collect_and_process_issue_events(owner, repo, repo_id, key_auth)
self._collect_and_process_pr_events(owner, repo, repo_id, key_auth)
self._collect_and_process_issue_events(owner, repo, repo_id, key_auth, since)
self._collect_and_process_pr_events(owner, repo, repo_id, key_auth, since)

def _collect_and_process_issue_events(self, owner, repo, repo_id, key_auth):
def _collect_and_process_issue_events(self, owner, repo, repo_id, key_auth, since):

engine = get_engine()

with engine.connect() as connection:

# TODO: Remove src id if it ends up not being needed
query = text(f"""
select issue_id as issue_id, gh_issue_number as issue_number, gh_issue_id as gh_src_id from issues WHERE repo_id={repo_id} order by created_at desc;
select issue_id as issue_id, gh_issue_number as issue_number, gh_issue_id as gh_src_id
from issues
where repo_id={repo_id}
and updated_at > timestamptz(timestamp '{since}')
order by created_at desc;
""")

issue_result = connection.execute(query).fetchall()
Expand Down Expand Up @@ -309,14 +329,18 @@ def _collect_and_process_issue_events(self, owner, repo, repo_id, key_auth):
events.clear()


def _collect_and_process_pr_events(self, owner, repo, repo_id, key_auth):
def _collect_and_process_pr_events(self, owner, repo, repo_id, key_auth, since):

engine = get_engine()

with engine.connect() as connection:

query = text(f"""
select pull_request_id, pr_src_number as gh_pr_number, pr_src_id from pull_requests WHERE repo_id={repo_id} order by pr_created_at desc;
select pull_request_id, pr_src_number as gh_pr_number, pr_src_id
from pull_requests
where repo_id={repo_id}
and pr_updated_at > timestamptz(timestamp '{since}')
order by pr_created_at desc;
""")

pr_result = connection.execute(query).fetchall()
Expand Down
2 changes: 1 addition & 1 deletion augur/tasks/start_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ def primary_repo_collect_phase(repo_git, full_collection):

#Define secondary group that can't run until after primary jobs have finished.
secondary_repo_jobs = group(
collect_events.si(repo_git),#*create_grouped_task_load(dataList=first_pass, task=collect_events).tasks,
collect_events.si(repo_git, full_collection),#*create_grouped_task_load(dataList=first_pass, task=collect_events).tasks,
collect_github_messages.si(repo_git, full_collection), #*create_grouped_task_load(dataList=first_pass,task=collect_github_messages).tasks,
collect_github_repo_clones_data.si(repo_git),
)
Expand Down

0 comments on commit 15a212f

Please sign in to comment.