Skip to content

Commit

Permalink
fix/no unsubscribes (#16)
Browse files Browse the repository at this point in the history
* fix: remove unsubscribe logic to see if that helps...

* feat: properly update updated_at column
  • Loading branch information
LukasDeco authored Jun 25, 2024
1 parent bff7f28 commit ab2aa8c
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 30 deletions.
32 changes: 6 additions & 26 deletions src/entrypoints/events/rpc_token_acct_updates.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::env;
use std::sync::{Arc, MutexGuard};

use chrono::Utc;
use deadpool::managed::Object;
use deadpool_diesel::Manager;
use diesel::{update, ExpressionMethods, PgConnection, QueryDsl, RunQueryDsl};
Expand Down Expand Up @@ -66,32 +67,8 @@ pub async fn new_handler(
token_acct_pubkey.to_string()
);

let (mut subscription, unsubscribe) = account_subscribe_res.ok().unwrap();
let (mut subscription, _) = account_subscribe_res.ok().unwrap();

let conn_manager_clone = Arc::clone(&conn_manager);
task::spawn(async move {
loop {
// 10 minute timeout for the length of a session
tokio::time::sleep(std::time::Duration::new(600, 0)).await;
let mut timeout_flag_val: MutexGuard<bool> = timeout_flag_arc.lock().unwrap();
if *timeout_flag_val {
break;
}

*timeout_flag_val = false;
}
println!(
"timed out. unsubscribing from account: {}",
token_acct_pubkey.to_string()
);
update_token_acct_with_status(
token_acct_pubkey.to_string(),
TokenAcctStatus::Enabled,
conn_manager_clone,
)
.await;
unsubscribe().await;
});
let conn_manager_clone_sub = Arc::clone(&conn_manager);
while let Some(val) = subscription.next().await {
let mut timeout_flag_val = timeout_flag.lock().unwrap();
Expand Down Expand Up @@ -223,7 +200,10 @@ async fn update_token_acct_with_status(
update(
token_accts::table.filter(token_accts::token_acct.eq(token_acct_query.to_string())),
)
.set(token_accts::dsl::status.eq(status_for_set))
.set((
token_accts::dsl::status.eq(status_for_set),
token_accts::dsl::updated_at.eq(Utc::now()),
))
.get_result::<TokenAcct>(db)
})
.await;
Expand Down
6 changes: 5 additions & 1 deletion src/entrypoints/http/post_watch_token_acct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::entities::token_accts::TokenAcct;
use crate::entities::token_accts::TokenAcctStatus;
use crate::entities::token_accts::WatchTokenBalancePayload;
use crate::entities::token_accts::WatchTokenBalanceResponse;
use chrono::Utc;
use deadpool::managed::Object;
use deadpool_diesel::Manager;
use diesel::prelude::*;
Expand Down Expand Up @@ -146,6 +147,9 @@ fn update_token_acct_with_status(
db: &mut PgConnection,
) -> Result<TokenAcct, diesel::result::Error> {
update(token_accts::table.filter(token_accts::token_acct.eq(token_acct_pubkey.to_string())))
.set(token_accts::dsl::status.eq(token_acct_status.clone()))
.set((
token_accts::dsl::status.eq(token_acct_status.clone()),
token_accts::dsl::updated_at.eq(Utc::now()),
))
.get_result::<TokenAcct>(db)
}
10 changes: 8 additions & 2 deletions src/services/balances.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ pub async fn handle_token_acct_change(
token_accts::table
.filter(token_accts::token_acct.eq(token_balance.token_acct.clone())),
)
.set(token_accts::amount.eq(new_amount))
.set((
token_accts::amount.eq(new_amount),
token_accts::dsl::updated_at.eq(Utc::now()),
))
.execute(conn)
})
.await??;
Expand Down Expand Up @@ -208,7 +211,10 @@ pub async fn handle_token_acct_in_tx(
diesel::update(
token_accts::table.filter(token_accts::token_acct.eq(token_account_update)),
)
.set(token_accts::status.eq(TokenAcctStatus::Watching))
.set((
token_accts::status.eq(TokenAcctStatus::Watching),
token_accts::dsl::updated_at.eq(Utc::now()),
))
.execute(db)
})
.await??;
Expand Down
5 changes: 4 additions & 1 deletion src/services/transactions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,10 @@ pub async fn handle_token_acct_balance_tx(
diesel::update(
token_accts::table.filter(token_accts::token_acct.eq(token_acct_clone_4)),
)
.set(token_accts::amount.eq(new_balance))
.set((
token_accts::amount.eq(new_balance),
token_accts::dsl::updated_at.eq(Utc::now()),
))
.execute(db)
})
.await??;
Expand Down

0 comments on commit ab2aa8c

Please sign in to comment.