Skip to content

Commit

Permalink
Merge pull request #25 from metaDAOproject/staging
Browse files Browse the repository at this point in the history
Deploy Production
  • Loading branch information
LukasDeco authored Jul 23, 2024
2 parents 6d573a3 + 84518b4 commit c4a3cd5
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 63 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Rust
name: Build and Test

on:
push:
Expand Down
25 changes: 24 additions & 1 deletion src/adapters/rpc.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use std::{env, sync::Arc};
use std::{env, str::FromStr, sync::Arc};

use solana_sdk::{commitment_config::CommitmentConfig, program_pack::Pack, pubkey::Pubkey};

pub async fn get_pubsub_client(
) -> Result<Arc<solana_client::nonblocking::pubsub_client::PubsubClient>, Box<dyn std::error::Error>>
Expand All @@ -8,3 +10,24 @@ pub async fn get_pubsub_client(
solana_client::nonblocking::pubsub_client::PubsubClient::new(&rpc_endpoint_ws).await?;
Ok(Arc::new(pub_sub_client))
}

pub async fn get_token_account_by_address(
rpc_client: Arc<solana_client::nonblocking::rpc_client::RpcClient>,
token_acct_address: String,
) -> Result<spl_token::state::Account, Box<dyn std::error::Error>> {
let token_acct_pubkey = Pubkey::from_str(&token_acct_address)?;
let account_data = rpc_client
.get_account_with_commitment(&token_acct_pubkey, CommitmentConfig::confirmed())
.await?;

if let Some(account) = account_data.value {
let token_account: spl_token::state::Account =
spl_token::state::Account::unpack(&account.data)?;
Ok(token_account)
} else {
Err(Box::new(std::io::Error::new(
std::io::ErrorKind::Other,
"could not find token acct",
)))
}
}
108 changes: 53 additions & 55 deletions src/entrypoints/events/rpc_token_acct_updates.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::env;
use std::sync::{Arc, MutexGuard};
use std::sync::Arc;

use chrono::Utc;
use deadpool::managed::Object;
Expand All @@ -8,11 +8,11 @@ use diesel::{update, ExpressionMethods, PgConnection, QueryDsl, RunQueryDsl};
use futures::StreamExt;
use solana_account_decoder::{UiAccount, UiAccountData};
use solana_client::{nonblocking::pubsub_client::PubsubClient, rpc_config::RpcAccountInfoConfig};
use solana_sdk::program_pack::Pack;
use solana_sdk::{commitment_config::CommitmentConfig, pubkey::Pubkey};
use std::sync::Mutex;
use tokio::task;

use crate::adapters;
use crate::entities::token_accts::token_accts::{self};
use crate::entities::token_accts::{TokenAcct, TokenAcctStatus};
use crate::entities::transactions::transactions::{self, tx_sig};
Expand Down Expand Up @@ -40,7 +40,6 @@ pub async fn new_handler(
}

let timeout_flag = Arc::new(Mutex::new(true));
let timeout_flag_arc = Arc::clone(&timeout_flag);

let account_subscribe_res = pub_sub_client
.account_subscribe(
Expand Down Expand Up @@ -126,60 +125,59 @@ async fn check_and_update_initial_balance(
token_acct_pubkey: &Pubkey,
token_acct_record: &TokenAcct,
) -> Result<(), Box<dyn std::error::Error>> {
let rpc_client = solana_client::nonblocking::rpc_client::RpcClient::new(rpc_endpoint);
let account_data = rpc_client
.get_account_with_commitment(token_acct_pubkey, CommitmentConfig::confirmed())
.await?;

if let Some(account) = account_data.value {
let token_account: spl_token::state::Account =
spl_token::state::Account::unpack(&account.data)?;
let balance = token_account.amount as i64;
let rpc_client = Arc::new(solana_client::nonblocking::rpc_client::RpcClient::new(
rpc_endpoint,
));
let token_account = adapters::rpc::get_token_account_by_address(
Arc::clone(&rpc_client),
token_acct_pubkey.to_string(),
)
.await?;
let balance = token_account.amount as i64;

if token_acct_record.amount != balance {
if token_acct_record.amount != balance {
if token_acct_record.amount != balance {
let latest_tx: Vec<
solana_client::rpc_response::RpcConfirmedTransactionStatusWithSignature,
> = rpc_client
.get_signatures_for_address(token_acct_pubkey)
.await?
.into_iter()
.filter(|tx| tx.err.is_none())
.collect();

if let Some(latest_tx_info) = latest_tx.first() {
let transaction_sig = latest_tx_info.signature.clone();
let transaction_sig_2 = latest_tx_info.signature.clone();
let transaction_exists: Option<Transaction> = conn_manager
.interact(move |db: &mut PgConnection| {
transactions::table
.filter(tx_sig.eq(transaction_sig.clone()))
.first::<Transaction>(db)
.optional()
})
.await??;

let slot = latest_tx_info.slot as i64;
let mint_acct = token_account.mint.to_string();
let owner_acct = token_account.owner.to_string();

let transaction_sig_option = if transaction_exists.is_some() {
Some(transaction_sig_2)
} else {
None
};

handle_token_acct_balance_tx(
conn_manager,
token_acct_pubkey.to_string(),
balance,
transaction_sig_option,
slot,
mint_acct,
owner_acct,
)
.await?;
}
let latest_tx: Vec<
solana_client::rpc_response::RpcConfirmedTransactionStatusWithSignature,
> = rpc_client
.get_signatures_for_address(token_acct_pubkey)
.await?
.into_iter()
.filter(|tx| tx.err.is_none())
.collect();

if let Some(latest_tx_info) = latest_tx.first() {
let transaction_sig = latest_tx_info.signature.clone();
let transaction_sig_2 = latest_tx_info.signature.clone();
let transaction_exists: Option<Transaction> = conn_manager
.interact(move |db: &mut PgConnection| {
transactions::table
.filter(tx_sig.eq(transaction_sig.clone()))
.first::<Transaction>(db)
.optional()
})
.await??;

let slot = latest_tx_info.slot as i64;
let mint_acct = token_account.mint.to_string();
let owner_acct = token_account.owner.to_string();

let transaction_sig_option = if transaction_exists.is_some() {
Some(transaction_sig_2)
} else {
None
};

handle_token_acct_balance_tx(
conn_manager,
token_acct_pubkey.to_string(),
balance,
transaction_sig_option,
slot,
mint_acct,
owner_acct,
)
.await?;
}
}
}
Expand Down
103 changes: 97 additions & 6 deletions src/entrypoints/http/post_watch_token_acct.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::env;
use std::str::FromStr;
use std::sync::Arc;

use crate::entities::token_accts::token_accts;
Expand All @@ -12,6 +14,9 @@ use deadpool_diesel::Manager;
use diesel::prelude::*;
use diesel::update;
use diesel::PgConnection;
use solana_sdk::commitment_config::CommitmentConfig;
use solana_sdk::program_pack::Pack;
use spl_token::state::Account;
use warp::{reject::Reject, Reply};

#[derive(Debug)]
Expand All @@ -22,7 +27,7 @@ pub async fn handler(
reply_with_status: warp::reply::WithStatus<&'static str>,
message: WatchTokenBalancePayload,
conn_manager: Arc<Object<Manager<PgConnection>>>,
) -> Result<impl warp::Reply, warp::Rejection> {
) -> Result<warp::reply::WithStatus<warp::reply::Json>, warp::Rejection> {
let response = reply_with_status.into_response();
if !response.status().is_success() {
return Ok(warp::reply::with_status(
Expand All @@ -47,6 +52,7 @@ pub async fn handler(
.await;

let token_acct_for_update = token_acct_pubkey.clone();
let token_acct_for_insert = token_acct_pubkey.clone();
match token_acct_res {
Ok(Ok(Some(token_acct_record))) => {
// if already watching, we need to switch back to enabled and then back to make sure account subscribe reinits
Expand Down Expand Up @@ -96,12 +102,97 @@ pub async fn handler(
));
}
Ok(Ok(None)) => {
return Ok(warp::reply::with_status(
warp::reply::json(&WatchTokenBalanceResponse {
message: format!("could not find token_acct to update"),
}),
response.status(),
// add token acct creation here
let rpc_endpoint =
env::var("RPC_ENDPOINT_HTTP").expect("RPC_ENDPOINT_HTTP must be set");
let rpc_client = Arc::new(solana_client::nonblocking::rpc_client::RpcClient::new(
rpc_endpoint,
));
let token_acct_pubkey_str = token_acct_for_insert.clone();
let token_acct_pubkey =
match solana_sdk::pubkey::Pubkey::from_str(&token_acct_pubkey_str) {
Ok(pubkey) => pubkey,
Err(_) => {
return Ok(warp::reply::with_status(
warp::reply::json(&WatchTokenBalanceResponse {
message: "Invalid token account public key".to_string(),
}),
warp::http::StatusCode::BAD_REQUEST,
));
}
};

let account_res = rpc_client
.get_account_with_commitment(&token_acct_pubkey, CommitmentConfig::confirmed())
.await;

let account_data = match account_res {
Ok(data) => data,
Err(_) => {
return Ok(warp::reply::with_status(
warp::reply::json(&WatchTokenBalanceResponse {
message: "Failed to fetch account data".to_string(),
}),
warp::http::StatusCode::BAD_REQUEST,
));
}
};

if let Some(account) = account_data.value {
let token_account: Account = match Account::unpack(&account.data) {
Ok(account) => account,
Err(_) => {
return Ok(warp::reply::with_status(
warp::reply::json(&WatchTokenBalanceResponse {
message: "Failed to unpack token account data".to_string(),
}),
warp::http::StatusCode::BAD_REQUEST,
));
}
};

let new_token_acct = TokenAcct {
token_acct: token_acct_for_insert.clone(),
owner_acct: token_account.owner.to_string(),
amount: 0,
status: TokenAcctStatus::Watching,
mint_acct: token_account.mint.to_string(),
updated_at: Some(Utc::now()),
};

let new_token_acct_clone = new_token_acct.clone();

let insert_res = conn_manager
.interact(move |db| {
diesel::insert_into(token_accts::table)
.values(&new_token_acct_clone)
.execute(db)
})
.await;
return match insert_res {
Ok(_) => Ok(warp::reply::with_status(
warp::reply::json(&WatchTokenBalanceResponse {
message: format!("inserted new token acct: {}", token_acct_for_insert),
}),
warp::http::StatusCode::OK,
)),
_ => Ok(warp::reply::with_status(
warp::reply::json(&WatchTokenBalanceResponse {
message: format!("could not insert new token_acct to watch"),
}),
response.status(),
)),
};
} else {
return Ok(warp::reply::with_status(
warp::reply::json(&WatchTokenBalanceResponse {
message: format!(
"could not find token_acct spl data for new token balance insert"
),
}),
response.status(),
));
}
}
}

Expand Down

0 comments on commit c4a3cd5

Please sign in to comment.