Skip to content

Commit

Permalink
updated endpoints, added job to load github metadata, expanded streak…
Browse files Browse the repository at this point in the history
… table
  • Loading branch information
akorchyn committed Jun 7, 2024
1 parent 6082364 commit 68c8164
Show file tree
Hide file tree
Showing 16 changed files with 397 additions and 131 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ usvg = { workspace = true, features = ["text"] }
rocket_prometheus.workspace = true
utoipa = { workspace = true, features = ["rocket_extras", "chrono"] }
utoipa-swagger-ui = { workspace = true, features = ["rocket"] }
octocrab.workspace = true

shared = { workspace = true, features = ["client"] }
63 changes: 63 additions & 0 deletions server/migrations/20240607131417_streak_repo_metadata.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
BEGIN;

-- Step 1: Create the new streak table
CREATE TABLE IF NOT EXISTS streak (
id INTEGER PRIMARY KEY,
name TEXT UNIQUE NOT NULL,
period TEXT NOT NULL
);

-- Step 2: Insert any missing streak entries into the new streak table
INSERT INTO
streak (id, name, period)
VALUES
(0, 'Weekly Pull Request', 'Weekly'),
(
1,
'Monthly Pull Request with score higher 8',
'Monthly'
);

ALTER TABLE
streak_user_data
ADD
COLUMN new_streak_id INTEGER;

-- Step 4: Copy data from the old streak_id column to the new_streak_id column
UPDATE
streak_user_data
SET
new_streak_id = streak_user_data.streak_id;

-- Step 5: Drop the old streak_id column
ALTER TABLE
streak_user_data DROP COLUMN streak_id;

-- Step 6: Rename the new_streak_id column to streak_id
ALTER TABLE
streak_user_data RENAME COLUMN new_streak_id TO streak_id;

-- Step 7: Add the foreign key constraint to the new streak_id column
ALTER TABLE
streak_user_data
ADD
CONSTRAINT fk_streak_id FOREIGN KEY (streak_id) REFERENCES streak(id) ON DELETE CASCADE;

-- Step 8: Re-add the primary key constraint
ALTER TABLE
streak_user_data
ADD
PRIMARY KEY (user_id, streak_id);

ALTER TABLE
repos
ADD
COLUMN primary_language TEXT,
ADD
COLUMN open_issues INTEGER,
ADD
COLUMN stars INTEGER,
ADD
COLUMN forks INTEGER;

COMMIT;
3 changes: 3 additions & 0 deletions server/sql/get_leaderboard.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ SELECT
prs_merged,
best as streak_best,
amount as streak_amount,
period as streak_type,
streak.name as streak_name,
latest_time_string as streak_latest_time_string
FROM
user_period_data
JOIN users ON users.id = user_period_data.user_id
JOIN streak_user_data ON streak_user_data.user_id = users.id
JOIN streak ON streak.id = streak_user_data.streak_id
WHERE
period_type = $1
and streak_user_data.streak_id = $2
Expand Down
12 changes: 10 additions & 2 deletions server/sql/get_repo_leaderboard.sql
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ SELECT
r.name AS name,
COALESCE(COUNT(pr.id), 0) AS total_prs,
COALESCE(SUM(pr.score), 0) AS total_score,
tc.contributor_name AS top_contributor
tc.contributor_name AS top_contributor,
r.primary_language,
r.open_issues,
r.stars,
r.forks
FROM
repos r
JOIN organizations o ON r.organization_id = o.id
Expand All @@ -30,7 +34,11 @@ FROM
GROUP BY
o.name,
r.name,
tc.contributor_name
tc.contributor_name,
r.primary_language,
r.open_issues,
r.stars,
r.forks
ORDER BY
total_score DESC,
total_prs DESC
Expand Down
7 changes: 7 additions & 0 deletions server/sql/get_repos.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
select
r.id as repo_id,
r.name as repo,
o.name as organization
from
repos as r
JOIN organizations o ON r.organization_id = o.id
12 changes: 12 additions & 0 deletions server/sql/get_streaks_for_user_id.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
SELECT
streak.id as streak_id,
name,
period as streak_type,
amount,
best,
latest_time_string
FROM
streak_user_data
JOIN streak ON streak.id = streak_user_data.streak_id
WHERE
user_id = $1
86 changes: 86 additions & 0 deletions server/src/contract_pull.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
use std::{
sync::{atomic::AtomicBool, Arc},
time::Duration,
};

use chrono::DateTime;
use rocket::fairing::AdHoc;
use rocket_db_pools::Database;
use shared::{near::NearClient, TimePeriod};

use crate::db::DB;

async fn fetch_and_store_users(near_client: &NearClient, db: &DB) -> anyhow::Result<()> {
let timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)?
.as_nanos();
let periods = [TimePeriod::Month, TimePeriod::Quarter, TimePeriod::AllTime]
.into_iter()
.map(|e| e.time_string(timestamp as u64))
.collect();
let users = near_client.users(periods).await?;
for user in users {
let user_id = db.upsert_user(&user.name).await?;
for (period, data) in user.period_data {
db.upsert_user_period_data(period, &data, user_id).await?;
}
for (streak_id, streak_data) in user.streaks {
db.upsert_streak_user_data(&streak_data, streak_id as i32, user_id)
.await?;
}
}

Ok(())
}

async fn fetch_and_store_prs(near_client: &NearClient, db: &DB) -> anyhow::Result<()> {
let prs = near_client.prs().await?;
for (pr, executed) in prs {
let organization_id = db.upsert_organization(&pr.organization).await?;
let repo_id = db.upsert_repo(organization_id, &pr.repo).await?;
let author_id = db.upsert_user(&pr.author).await?;
let _ = db
.upsert_pull_request(
repo_id,
pr.number as i32,
author_id,
DateTime::from_timestamp_nanos(pr.created_at as i64).naive_utc(),
pr.merged_at
.map(|t| DateTime::from_timestamp_nanos(t as i64).naive_utc()),
pr.score(),
executed,
)
.await?;
}
Ok(())
}

async fn fetch_and_store_all_data(near_client: &NearClient, db: &DB) -> anyhow::Result<()> {
fetch_and_store_users(near_client, db).await?;
fetch_and_store_prs(near_client, db).await?;
Ok(())
}

pub fn stage(client: NearClient, sleep_duration: Duration, atomic_bool: Arc<AtomicBool>) -> AdHoc {
rocket::fairing::AdHoc::on_liftoff("Load users from Near every X minutes", move |rocket| {
Box::pin(async move {
// Get an actual DB connection
let db = DB::fetch(rocket)
.expect("Failed to get DB connection")
.clone();

rocket::tokio::spawn(async move {
let mut interval = rocket::tokio::time::interval(sleep_duration);
let near_client = client;
while atomic_bool.load(std::sync::atomic::Ordering::Relaxed) {
interval.tick().await;

// Execute a query of some kind
if let Err(e) = fetch_and_store_all_data(&near_client, &db).await {
rocket::error!("Failed to fetch and store data: {:#?}", e);
}
}
});
})
})
}
56 changes: 42 additions & 14 deletions server/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ pub mod types;

use types::LeaderboardRecord;

use self::types::{RepoRecord, StreakRecord, UserContributionRecord, UserPeriodRecord, UserRecord};
use self::types::{
RepoLeaderboardRecord, RepoRecord, StreakRecord, UserContributionRecord, UserPeriodRecord,
UserRecord,
};

impl DB {
pub async fn upsert_user(&self, user: &str) -> anyhow::Result<i32> {
Expand Down Expand Up @@ -153,6 +156,30 @@ impl DB {
Ok(())
}

pub async fn update_repo_metadata(
&self,
repo_id: i32,
stars: u32,
forks: u32,
open_issues: u32,
primary_language: Option<String>,
) -> anyhow::Result<()> {
sqlx::query!(
r#"
UPDATE repos
SET stars = $1, forks = $2, open_issues = $3, primary_language = $4
WHERE id = $5"#,
stars as i32,
forks as i32,
open_issues as i32,
primary_language,
repo_id
)
.execute(&self.0)
.await?;
Ok(())
}

pub async fn get_user(
&self,
name: &str,
Expand All @@ -178,17 +205,10 @@ impl DB {
.fetch_all(&self.0)
.await?;

let streak_recs: Vec<StreakRecord> = sqlx::query_as!(
StreakRecord,
r#"
SELECT streak_id, amount, best, latest_time_string
FROM streak_user_data
WHERE user_id = $1
"#,
user_rec
)
.fetch_all(&self.0)
.await?;
let streak_recs: Vec<StreakRecord> =
sqlx::query_file_as!(StreakRecord, "./sql/get_streaks_for_user_id.sql", user_rec)
.fetch_all(&self.0)
.await?;

let mut leaderboard_places = Vec::with_capacity(place_strings.len());
for place in place_strings {
Expand Down Expand Up @@ -254,12 +274,12 @@ impl DB {
&self,
page: i64,
limit: i64,
) -> anyhow::Result<(Vec<RepoRecord>, u64)> {
) -> anyhow::Result<(Vec<RepoLeaderboardRecord>, u64)> {
let offset = page * limit;
// COALESCE is used to return 0 if there are no PRs for a repo
// But sqlx still thinks that it's NONE
let records = sqlx::query_file_as_unchecked!(
RepoRecord,
RepoLeaderboardRecord,
"./sql/get_repo_leaderboard.sql",
limit,
offset
Expand Down Expand Up @@ -339,6 +359,14 @@ impl DB {
.map(|r| (r.name, r.total_score.unwrap_or_default()))
.collect())
}

pub async fn get_repos(&self) -> anyhow::Result<Vec<RepoRecord>> {
let rec = sqlx::query_file_as!(RepoRecord, "./sql/get_repos.sql")
.fetch_all(&self.0)
.await?;

Ok(rec)
}
}

async fn run_migrations(rocket: Rocket<Build>) -> fairing::Result {
Expand Down
17 changes: 16 additions & 1 deletion server/src/db/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ pub struct LeaderboardRecord {
pub prs_merged: i32,
pub streak_best: i32,
pub streak_amount: i32,
pub streak_name: String,
pub streak_type: String,
pub streak_latest_time_string: String,
}

Expand All @@ -27,6 +29,8 @@ pub struct UserPeriodRecord {
#[derive(Debug, Clone, sqlx::FromRow, Serialize, Deserialize, Default)]
pub struct StreakRecord {
pub streak_id: i32,
pub name: String,
pub streak_type: String,
pub amount: i32,
pub best: i32,
pub latest_time_string: String,
Expand All @@ -52,12 +56,16 @@ impl UserRecord {
}

#[derive(Debug, Clone, sqlx::FromRow, Serialize, Deserialize)]
pub struct RepoRecord {
pub struct RepoLeaderboardRecord {
pub organization: String,
pub name: String,
pub total_prs: i64,
pub total_score: i64,
pub top_contributor: GithubHandle,
pub stars: i32,
pub open_issues: i32,
pub primary_language: Option<String>,
pub forks: i32,
}

#[derive(Debug, Clone, sqlx::FromRow, Serialize, Deserialize)]
Expand All @@ -71,3 +79,10 @@ pub struct UserContributionRecord {
pub created_at: chrono::NaiveDateTime,
pub merged_at: Option<chrono::NaiveDateTime>,
}

#[derive(Debug, Clone, sqlx::FromRow, Serialize, Deserialize)]
pub struct RepoRecord {
pub organization: String,
pub repo: String,
pub repo_id: i32,
}
Loading

0 comments on commit 68c8164

Please sign in to comment.