Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: refactor FlushRepoTask #246

Merged
merged 2 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 101 additions & 33 deletions tasks/flush_repo.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import logging
from dataclasses import dataclass
from typing import Optional

from sqlalchemy import null
import sentry_sdk

from app import celery_app
from database.engine import Session
from database.models import (
Branch,
Commit,
Expand Down Expand Up @@ -32,16 +35,28 @@
log = logging.getLogger(__name__)


@dataclass
class FlushRepoTaskReturnType(object):
error: Optional[str] = None
deleted_commits_count: int = 0
delete_branches_count: int = 0
deleted_pulls_count: int = 0
deleted_archives_count: int = 0


class FlushRepoTask(BaseCodecovTask, name="app.tasks.flush_repo.FlushRepo"):
async def run_async(self, db_session, *, repoid: int, **kwargs):
log.info("Deleting repo contents", extra=dict(repoid=repoid))
repo = db_session.query(Repository).filter_by(repoid=repoid).first()
@sentry_sdk.trace
def _delete_archive(self, repo: Repository) -> int:
archive_service = ArchiveService(repo)
deleted_archives = archive_service.delete_repo_files()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this fn be async? I suppose we don't necessarily care to wait for the files to be deleted, but we don't know for certain if the call succeeded otherwise right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation of the ArchiveService (and underlying storage systems) is not async. Even if I wanted to make it async refactoring all that service is faaaar from the scope of this ticket.

I actually think we would benefit from making it async. Same goes for the DB interactions (there are certain queries we could parallelize there), but we are constrained by the interfaces we have at hand.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea agree, not part of this scope, just found it interesting that the archive service wasn't aysnc in the first place, makes sense

log.info(
"Deleted archives from storage",
extra=dict(repoid=repo.repoid, deleted_archives_count=deleted_archives),
)
return deleted_archives

commit_ids = db_session.query(Commit.id_).filter_by(repoid=repoid)

# comparisons
@sentry_sdk.trace
def _delete_comparisons(self, db_session: Session, commit_ids, repoid: int) -> None:
commit_comparison_ids = db_session.query(CompareCommit.id_).filter(
CompareCommit.base_commit_id.in_(commit_ids)
| CompareCommit.compare_commit_id.in_(commit_ids)
Expand All @@ -55,11 +70,10 @@
| CompareCommit.compare_commit_id.in_(commit_ids)
).delete(synchronize_session=False)
db_session.commit()
log.info("Deleted comparisons", extra=dict(repoid=repoid))

# reports
report_ids = db_session.query(CommitReport.id_).filter(
CommitReport.commit_id.in_(commit_ids)
)
@sentry_sdk.trace
def _delete_reports(self, db_session: Session, report_ids, repoid: int):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ty for all these helper methods, a lot more organized + easy to read

db_session.query(ReportDetails).filter(
ReportDetails.report_id.in_(report_ids)
).delete(synchronize_session=False)
Expand All @@ -70,7 +84,10 @@
ReportResults.report_id.in_(report_ids)
).delete(synchronize_session=False)
db_session.commit()
log.info("Deleted reports", extra=dict(repoid=repoid))

@sentry_sdk.trace
def _delete_uploads(self, db_session: Session, report_ids, repoid: int):
# uploads
upload_ids = db_session.query(Upload.id_).filter(
Upload.report_id.in_(report_ids)
Expand All @@ -84,62 +101,113 @@
db_session.query(uploadflagmembership).filter(
uploadflagmembership.c.upload_id.in_(upload_ids)
).delete(synchronize_session=False)
db_session.commit()

db_session.query(Upload).filter(Upload.report_id.in_(report_ids)).delete(
synchronize_session=False
)
db_session.commit()
log.info("Deleted uploads", extra=dict(repoid=repoid))

@sentry_sdk.trace
def _delete_commit_details(self, db_session: Session, commit_ids, repoid: int):
db_session.query(CommitReport).filter(
CommitReport.commit_id.in_(commit_ids)
).delete(synchronize_session=False)
# TODO: component comparisons
db_session.query(RepositoryFlag).filter_by(repository_id=repo.repoid).delete()
db_session.query(RepositoryFlag).filter_by(repository_id=repoid).delete()
db_session.commit()

db_session.query(CommitError).filter(
CommitError.commit_id.in_(commit_ids)
).delete(synchronize_session=False)
db_session.query(CommitNotification).filter(
CommitNotification.commit_id.in_(commit_ids)
).delete(synchronize_session=False)
db_session.commit()
log.info("Deleted commit details", extra=dict(repoid=repoid))

# static analysis
@sentry_sdk.trace
def _delete_static_analysis(self, db_session: Session, commit_ids, repoid: int):
db_session.query(StaticAnalysisSuite).filter(
StaticAnalysisSuite.commit_id.in_(commit_ids)
).delete(synchronize_session=False)
snapshot_ids = db_session.query(StaticAnalysisSingleFileSnapshot.id_).filter_by(
repository_id=repo.repoid
repository_id=repoid
)
db_session.query(StaticAnalysisSuiteFilepath).filter(
StaticAnalysisSuiteFilepath.file_snapshot_id.in_(snapshot_ids)
).delete(synchronize_session=False)
db_session.query(StaticAnalysisSingleFileSnapshot).filter_by(
repository_id=repo.repoid
repository_id=repoid
).delete()
db_session.commit()
log.info("Deleted static analysis info", extra=dict(repoid=repoid))

# label analysis
@sentry_sdk.trace
def _delete_label_analysis(self, db_session: Session, commit_ids, repoid: int):
db_session.query(LabelAnalysisRequest).filter(
LabelAnalysisRequest.base_commit_id.in_(commit_ids)
| LabelAnalysisRequest.head_commit_id.in_(commit_ids)
).delete(synchronize_session=False)
db_session.commit()
log.info("Deleted label analysis info", extra=dict(repoid=repoid))

Check warning on line 150 in tasks/flush_repo.py

View check run for this annotation

Codecov - Staging / codecov/patch

tasks/flush_repo.py#L150

Added line #L150 was not covered by tests

Check warning on line 150 in tasks/flush_repo.py

View check run for this annotation

Codecov - QA / codecov/patch

tasks/flush_repo.py#L150

Added line #L150 was not covered by tests

Check warning on line 150 in tasks/flush_repo.py

View check run for this annotation

Codecov Public QA / codecov/patch

tasks/flush_repo.py#L150

Added line #L150 was not covered by tests

Check warning on line 150 in tasks/flush_repo.py

View check run for this annotation

Codecov / codecov/patch

tasks/flush_repo.py#L150

Added line #L150 was not covered by tests

deleted_commits = (
db_session.query(Commit).filter_by(repoid=repo.repoid).delete()
)
delete_branches = (
db_session.query(Branch).filter_by(repoid=repo.repoid).delete()
)
deleted_pulls = db_session.query(Pull).filter_by(repoid=repo.repoid).delete()
@sentry_sdk.trace
def _delete_commits(self, db_session: Session, repoid: int) -> int:
deleted_commits = db_session.query(Commit).filter_by(repoid=repoid).delete()
db_session.commit()
log.info("Deleted commits", extra=dict(repoid=repoid))
return deleted_commits

@sentry_sdk.trace
def _delete_branches(self, db_session: Session, repoid: int) -> int:
deleted_branches = db_session.query(Branch).filter_by(repoid=repoid).delete()
db_session.commit()
log.info("Deleted branches", extra=dict(repoid=repoid))
return deleted_branches

@sentry_sdk.trace
def _delete_pulls(self, db_session: Session, repoid: int) -> int:
deleted_pulls = db_session.query(Pull).filter_by(repoid=repoid).delete()
db_session.commit()
log.info("Deleted pulls", extra=dict(repoid=repoid))
return deleted_pulls

@sentry_sdk.trace
async def run_async(
self, db_session: Session, *, repoid: int, **kwargs
) -> FlushRepoTaskReturnType:
log.info("Deleting repo contents", extra=dict(repoid=repoid))
repo = db_session.query(Repository).filter_by(repoid=repoid).first()
if repo is None:
log.exception("Repo not found", extra=dict(repoid=repoid))
return FlushRepoTaskReturnType(error="repo not found")

Check warning on line 181 in tasks/flush_repo.py

View check run for this annotation

Codecov - Staging / codecov/patch

tasks/flush_repo.py#L180-L181

Added lines #L180 - L181 were not covered by tests

Check warning on line 181 in tasks/flush_repo.py

View check run for this annotation

Codecov - QA / codecov/patch

tasks/flush_repo.py#L180-L181

Added lines #L180 - L181 were not covered by tests

Check warning on line 181 in tasks/flush_repo.py

View check run for this annotation

Codecov Public QA / codecov/patch

tasks/flush_repo.py#L180-L181

Added lines #L180 - L181 were not covered by tests

Check warning on line 181 in tasks/flush_repo.py

View check run for this annotation

Codecov / codecov/patch

tasks/flush_repo.py#L180-L181

Added lines #L180 - L181 were not covered by tests

deleted_archives = self._delete_archive(repo)
with sentry_sdk.start_span("query_commit_ids"):
commit_ids = db_session.query(Commit.id_).filter_by(repoid=repo.repoid)
self._delete_comparisons(db_session, commit_ids, repoid)

with sentry_sdk.start_span("query_report_ids"):
report_ids = db_session.query(CommitReport.id_).filter(
CommitReport.commit_id.in_(commit_ids)
)
self._delete_reports(db_session, report_ids, repoid)
self._delete_uploads(db_session, report_ids, repoid)

self._delete_commit_details(db_session, commit_ids, repoid)

# TODO: Component comparison
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would we still leave the commit and component comparisons existing in the DB? I think since commit comparisons rely on commit ids existing, that the delete_commits fn below would fail wouldnt it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is true. It's not the only piece of data that we should be removing but aren't (LabelAnalysisRequestProcessingError has the same issue).

But I wanted to be very intentional with these changes and limit them to a refactor.
This for a couple of reasons that include:

  1. Checking all the missing pieces of data, adding them in and adding tests for all of that consumes a lot more time.
  2. These changes are meant to give more information so we can make better decisions on how to improve the feature overall. We might come to the conclusion that we need to tear the whole thing apart and start from scratch. In this case the extra effort I'd have to put to add the missing details would be lost.
  3. Currently there's no "epic" or "plan" on how to tackle this flow. This needs to be discussed and prioritized. I personally think that this knowledge that some data is not being deleted when it should needs to factor in on the "how big this project is gonna be?"

So while you are absolutely right and we should (and will) fix it I hope we can ignore the fact that this is partially broken for now. It is one of the reasons I left the comment there.
Also this comment captures some of the data that we know is missing, in case you're worried it'll be lost.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah big time, didn't mean to imply we needed to add those as yeah this should be contained to a refactor, was mostly trying to see if what I said made sense, still learning the ropes here 😅. And ty writing the things we're missing somewhere too 👌


self._delete_static_analysis(db_session, commit_ids, repoid)

deleted_commits = self._delete_commits(db_session, repoid)
deleted_branches = self._delete_branches(db_session, repoid)
deleted_pulls = self._delete_pulls(db_session, repoid)
repo.yaml = None
return {
"deleted_commits_count": deleted_commits,
"delete_branches_count": delete_branches,
"deleted_pulls_count": deleted_pulls,
"deleted_archives": deleted_archives,
}
return FlushRepoTaskReturnType(
deleted_archives_count=deleted_archives,
deleted_commits_count=deleted_commits,
delete_branches_count=deleted_branches,
deleted_pulls_count=deleted_pulls,
)


FlushRepo = celery_app.register_task(FlushRepoTask())
Expand Down
61 changes: 33 additions & 28 deletions tasks/tests/unit/test_flush_repo.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import pytest
from sqlalchemy import null
from sqlalchemy.dialects import postgresql

from database.models import Repository
from database.tests.factories import (
BranchFactory,
CommitFactory,
Expand All @@ -12,7 +9,7 @@
)
from database.tests.factories.reports import CompareFlagFactory, RepositoryFlagFactory
from services.archive import ArchiveService
from tasks.flush_repo import FlushRepoTask
from tasks.flush_repo import FlushRepoTask, FlushRepoTaskReturnType


class TestFlushRepo(object):
Expand All @@ -23,12 +20,14 @@ async def test_flush_repo_nothing(self, dbsession, mock_storage):
dbsession.add(repo)
dbsession.flush()
res = await task.run_async(dbsession, repoid=repo.repoid)
assert res == {
"delete_branches_count": 0,
"deleted_archives": 0,
"deleted_commits_count": 0,
"deleted_pulls_count": 0,
}
assert res == FlushRepoTaskReturnType(
**{
"delete_branches_count": 0,
"deleted_archives_count": 0,
"deleted_commits_count": 0,
"deleted_pulls_count": 0,
}
)

@pytest.mark.asyncio
async def test_flush_repo_few_of_each_only_db_objects(
Expand Down Expand Up @@ -65,12 +64,14 @@ async def test_flush_repo_few_of_each_only_db_objects(
dbsession.add(branch)
dbsession.flush()
res = await task.run_async(dbsession, repoid=repo.repoid)
assert res == {
"delete_branches_count": 23,
"deleted_archives": 0,
"deleted_commits_count": 16,
"deleted_pulls_count": 17,
}
assert res == FlushRepoTaskReturnType(
**{
"delete_branches_count": 23,
"deleted_archives_count": 0,
"deleted_commits_count": 16,
"deleted_pulls_count": 17,
}
)

@pytest.mark.asyncio
async def test_flush_repo_only_archives(self, dbsession, mock_storage):
Expand All @@ -82,12 +83,14 @@ async def test_flush_repo_only_archives(self, dbsession, mock_storage):
archive_service.write_chunks(f"commit_sha{i}", f"data{i}")
task = FlushRepoTask()
res = await task.run_async(dbsession, repoid=repo.repoid)
assert res == {
"delete_branches_count": 0,
"deleted_archives": 4,
"deleted_commits_count": 0,
"deleted_pulls_count": 0,
}
assert res == FlushRepoTaskReturnType(
**{
"delete_branches_count": 0,
"deleted_archives_count": 4,
"deleted_commits_count": 0,
"deleted_pulls_count": 0,
}
)

@pytest.mark.asyncio
async def test_flush_repo_little_bit_of_everything(self, dbsession, mock_storage):
Expand All @@ -109,12 +112,14 @@ async def test_flush_repo_little_bit_of_everything(self, dbsession, mock_storage
archive_service.write_chunks(f"commit_sha{i}", f"data{i}")
task = FlushRepoTask()
res = await task.run_async(dbsession, repoid=repo.repoid)
assert res == {
"delete_branches_count": 23,
"deleted_archives": 4,
"deleted_commits_count": 8,
"deleted_pulls_count": 17,
}
assert res == FlushRepoTaskReturnType(
**{
"delete_branches_count": 23,
"deleted_archives_count": 4,
"deleted_commits_count": 8,
"deleted_pulls_count": 17,
}
)
dbsession.flush()
dbsession.refresh(repo)
# Those assertions are almost tautological. If they start being a
Expand Down
Loading