Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Alterations to DMS-related functions after PostgreSQL was adopted #43

Merged
merged 9 commits into from
Aug 14, 2024
Merged
14 changes: 8 additions & 6 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,27 @@ Description: More about what it does (maybe more than one line)
Use four spaces when indenting paragraphs within the Description.
License: Artistic-2.0
Encoding: UTF-8
RoxygenNote: 7.3.1
RoxygenNote: 7.3.2
Imports:
curl,
data.table,
DBI,
dplyr,
glue,
MSnbase,
MSnID,
odbc,
MSnSet.utils,
parallel,
pbapply,
plyr,
RCurl,
readr,
RPostgres,
stats,
stringr,
tibble,
tidyr,
utils,
pbapply,
parallel,
MSnSet.utils
utils
Remotes:
github::PNNL-Comp-Mass-Spec/MSnID@pnnl-master,
github::PNNL-Comp-Mass-Spec/MSnSet.utils@master
Expand Down
13 changes: 7 additions & 6 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ export(read_msfragger_data_from_DMS)
export(read_msgf_data_from_DMS)
export(read_study_design_from_DMS)
export(write_study_design_to_DMS)
importFrom(DBI,dbClearResult)
importFrom(DBI,dbConnect)
importFrom(DBI,dbDisconnect)
importFrom(DBI,dbFetch)
importFrom(DBI,dbSendQuery)
importFrom(MSnID,MSnID)
importFrom(MSnID,`psms<-`)
importFrom(MSnID,convert_msgf_output_to_msnid)
Expand All @@ -31,6 +36,7 @@ importFrom(MSnID,read_mzIDs)
importFrom(MSnSet.utils,read_FragPipe_LFQ)
importFrom(MSnbase,MSnSet)
importFrom(RCurl,url.exists)
importFrom(RPostgres,Postgres)
importFrom(curl,curl_download)
importFrom(data.table,`:=`)
importFrom(data.table,as.data.table)
Expand All @@ -54,12 +60,7 @@ importFrom(dplyr,select)
importFrom(dplyr,slice)
importFrom(dplyr,starts_with)
importFrom(dplyr,ungroup)
importFrom(odbc,dbClearResult)
importFrom(odbc,dbConnect)
importFrom(odbc,dbDisconnect)
importFrom(odbc,dbFetch)
importFrom(odbc,dbSendQuery)
importFrom(odbc,odbc)
importFrom(glue,glue)
importFrom(parallel,makeCluster)
importFrom(parallel,stopCluster)
importFrom(pbapply,pbwalk)
Expand Down
147 changes: 67 additions & 80 deletions R/PNNL_DMS_utils.R
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
#' or not. Default is FALSE.
#' @param ncores number of cores to use in cluster
#'
#' @importFrom odbc odbc dbConnect dbSendQuery dbFetch dbClearResult
#' dbDisconnect
#' @importFrom DBI dbConnect dbSendQuery dbFetch dbClearResult dbDisconnect
#' @importFrom RPostgres Postgres
#' @importFrom plyr ldply
#' @importFrom dplyr %>% rename filter select any_of mutate
#' @importFrom tidyr unnest
Expand Down Expand Up @@ -112,17 +112,31 @@ get_auth <- function(){
}
}

#' @importFrom RPostgres Postgres
get_db_connection <- function(
driver = Postgres,
driver_args = list(),
host = "prismdb2.emsl.pnl.gov",
user = "dmsreader",
password = "dms4fun",
dbname = "dms",
...) {
driver_object <- do.call(driver, driver_args)
con <- dbConnect(driver_object,
host = host,
user = user,
password = password,
dbname = dbname,
...
)
return(con)
}

#' @export
#' @rdname pnnl_dms_utils
is_PNNL_DMS_connection_successful <- function()
{
con_str <- sprintf("DRIVER={%s};SERVER=gigasax;DATABASE=dms5;%s",
get_driver(),
get_auth())

con_test_res <- try(con <- dbConnect(odbc(), .connection_string=con_str),
TRUE)
con_test_res <- try(con <- get_db_connection(), TRUE)
if(inherits(con_test_res, "try-error")){
# no connection
return(FALSE)
Expand All @@ -133,6 +147,11 @@ is_PNNL_DMS_connection_successful <- function()
}
}

.new_tempdir <- function() {
tf <- tempfile()
dir.create(tf)
return(tf)
}


#' @export
Expand All @@ -159,29 +178,26 @@ get_dms_job_records <- function(
}

# initialize connection
con_str <- sprintf("DRIVER={%s};SERVER=gigasax;DATABASE=dms5;%s",
get_driver(),
get_auth())
con <- dbConnect(odbc(), .connection_string=con_str)
con <- get_db_connection()

# set-up query based on job list
if(!is.null(jobs)){
strSQL = sprintf("SELECT *
FROM V_Mage_Analysis_Jobs
WHERE [Job] IN ('%s')
FROM v_mage_analysis_jobs
WHERE job IN ('%s')
",
paste(jobs,sep="",collapse="',\n'"))
}else{
strSQL = sprintf("SELECT *
FROM V_Mage_Analysis_Jobs
WHERE [Dataset] LIKE '%%%s%%'
AND [Experiment] LIKE '%%%s%%'
AND [Tool] LIKE '%%%s%%'
AND [Parameter_File] LIKE '%%%s%%'
AND [Settings_File] LIKE '%%%s%%'
AND [Protein Collection List] LIKE '%%%s%%'
AND [Protein Options] LIKE '%%%s%%'
AND [Instrument] LIKE '%%%s%%'
FROM v_mage_analysis_jobs
WHERE dataset LIKE '%%%s%%'
AND experiment LIKE '%%%s%%'
AND tool LIKE '%%%s%%'
AND parameter_file LIKE '%%%s%%'
AND settings_file LIKE '%%%s%%'
AND protein collection list LIKE '%%%s%%'
AND protein options LIKE '%%%s%%'
AND instrument LIKE '%%%s%%'
",
datasetPttrn,
experimentPttrn,
Expand All @@ -207,7 +223,7 @@ get_tool_output_files_for_job_number <- function(jobNumber, toolName = NULL,
{
# get job records first. This will be useful to get dataset folder
jobRecord <- get_dms_job_records(jobNumber)
datasetFolder <- dirname( as.character(jobRecord$Folder))
datasetFolder <- dirname(as.character(jobRecord$folder))

# get tool's subfolder
if( is.null(toolName) ){
Expand All @@ -233,14 +249,11 @@ get_tool_output_files_for_job_number <- function(jobNumber, toolName = NULL,
#' @rdname pnnl_dms_utils
get_output_folder_for_job_and_tool <- function(jobNumber, toolName, mostRecent=TRUE)
{
con_str <- sprintf("DRIVER={%s};SERVER=gigasax;DATABASE=DMS_Pipeline;%s",
get_driver(),
get_auth())
con <- dbConnect(odbc(), .connection_string=con_str)
strSQLPattern = "SELECT Output_Folder
FROM V_Job_Steps_History
WHERE (Job = %s) AND (Tool = '%s') AND (Most_Recent_Entry = 1)"
strSQL <- sprintf( strSQLPattern, jobNumber, toolName)
con <- get_db_connection()
strSQLPattern = "SELECT output_folder
FROM sw.v_job_steps_history
WHERE (job = %s) AND (tool = '%s') AND (most_recent_entry = 1)"
strSQL <- sprintf(strSQLPattern, jobNumber, toolName)
qry <- dbSendQuery(con, strSQL)
res <- dbFetch(qry)
dbClearResult(qry)
Expand All @@ -249,38 +262,18 @@ get_output_folder_for_job_and_tool <- function(jobNumber, toolName, mostRecent=T
}


# RODBC version
# #' @export
# #' @rdname pnnl_dms_utils
# get_job_records_by_dataset_package <- function(data_package_num)
# {
# con_str <- sprintf("DRIVER={%s};SERVER=gigasax;DATABASE=dms5;%s",
# get_driver(),
# get_auth())
# con <- odbcDriverConnect(con_str)
# strSQL = sprintf("
# SELECT *
# FROM V_Mage_Data_Package_Analysis_Jobs
# WHERE Data_Package_ID = %s",
# data_package_num)
# jr <- sqlQuery(con, strSQL, stringsAsFactors=FALSE)
# close(con)
# return(jr)
# }



# odbc/DBI verson
#' @export
#' @rdname pnnl_dms_utils
get_job_records_by_dataset_package <- function(data_package_num)
{
con_str <- sprintf("DRIVER={%s};SERVER=gigasax;DATABASE=dms5;%s",
get_driver(),
get_auth())
con <- dbConnect(odbc(), .connection_string=con_str)
con <- get_db_connection()
strSQL <- sprintf("
SELECT *
FROM V_Mage_Data_Package_Analysis_Jobs
FROM v_mage_data_package_analysis_jobs
WHERE Data_Package_ID = %s",
data_package_num)
qry <- dbSendQuery(con, strSQL)
Expand All @@ -296,13 +289,10 @@ get_job_records_by_dataset_package <- function(data_package_num)
#' @rdname pnnl_dms_utils
get_datasets_by_data_package <- function(data_package_num)
{
con_str <- sprintf("DRIVER={%s};SERVER=gigasax;DATABASE=dms5;%s",
get_driver(),
get_auth())
con <- dbConnect(odbc(), .connection_string=con_str)
con <- get_db_connection()
strSQL <- sprintf("
SELECT *
FROM V_Mage_Data_Package_Datasets
FROM v_mage_data_package_datasets
WHERE Data_Package_ID = %s",
data_package_num)
qry <- dbSendQuery(con, strSQL)
Expand All @@ -321,21 +311,21 @@ download_datasets_by_data_package <- function(data_package_num,
ncores = 2)
{
dataset_tbl <- get_datasets_by_data_package(data_package_num) %>%
dplyr::select(Folder)
dplyr::select(folder)
if (.Platform$OS.type == "unix") {
pathToFile <- dataset_tbl %>%
dplyr::mutate(Folder = get_url_from_dir_and_file(Folder, fileNamePttrn))
dplyr::mutate(folder = get_url_from_dir_and_file(folder, fileNamePttrn))
} else if (.Platform$OS.type == "windows") {
pathToFile <- dataset_tbl %>%
dplyr::mutate(Folder = list.files(path=Folder,
dplyr::mutate(folder = list.files(path=folder,
pattern=fileNamePttrn,
full.names=TRUE))
} else {
stop("Unknown OS type.")
}
multiproc_cl <- makeCluster(ncores)
on.exit(stopCluster(multiproc_cl))
pbwalk(X = pathToFile$Folder, FUN = file.copy, cl = multiproc_cl, to = copy_to)
pbwalk(X = pathToFile$folder, FUN = file.copy, cl = multiproc_cl, to = copy_to)
}


Expand All @@ -347,11 +337,11 @@ get_results_for_multiple_jobs <- function(jobRecords){
(It is not compatible with macOS)")
}

toolName <- unique(jobRecords[["Tool"]])
toolName <- unique(jobRecords[["tool"]])
if (length(toolName) > 1){
stop("Contains results of more then one tool.")
}
results = ldply(jobRecords[["Folder"]],
results = ldply(jobRecords[["folder"]],
get_results_for_single_job,
fileNamePttrn=tool2suffix[[toolName]],
.progress = "text")
Expand All @@ -362,13 +352,13 @@ get_results_for_multiple_jobs <- function(jobRecords){
#' @export
#' @rdname pnnl_dms_utils
get_results_for_multiple_jobs.dt <- function(jobRecords, expected_multiple_files = FALSE){
toolName = unique(jobRecords[["Tool"]])
toolName = unique(jobRecords[["tool"]])
if (length(toolName) > 1) {
stop("Contains results of more than one tool.")
}
result <- list()
for (fileNamePttrn in tool2suffix[[toolName]]){
result[[fileNamePttrn]] <- llply(jobRecords[["Folder"]],
result[[fileNamePttrn]] <- llply(jobRecords[["folder"]],
get_results_for_single_job.dt,
fileNamePttrn = fileNamePttrn,
expected_multiple_files = expected_multiple_files,
Expand Down Expand Up @@ -505,13 +495,13 @@ path_to_FASTA_used_by_DMS <- function(data_package_num, organism_db = NULL)
# make sure it was the same fasta used for all msgf jobs
# at this point this works only with one data package at a time
jobRecords <- get_job_records_by_dataset_package(data_package_num)
# jobRecords <- jobRecords[grepl("MSGFPlus", jobRecords$Tool),]
# if(length(unique(jobRecords$`Organism DB`)) != 1){
# jobRecords <- jobRecords[grepl("MSGFPlus", jobRecords$tool),]
# if(length(unique(jobRecords$organism_db)) != 1){
# stop("There should be exactly one FASTA file per data package!")
# }

# All FASTA files
fasta_files <- setdiff(unique(jobRecords$`Organism DB`), "na")
fasta_files <- setdiff(unique(jobRecords$organism_db), "na")
if (length(fasta_files) == 0)
stop(paste0("There are no FASTA files in the package."))

Expand All @@ -526,24 +516,21 @@ path_to_FASTA_used_by_DMS <- function(data_package_num, organism_db = NULL)
organism_db <- fasta_files
}
# Filter to specific FASTA file
jobRecords <- jobRecords[jobRecords$`Organism DB` == organism_db, ]
jobRecords <- jobRecords[jobRecords$organism_db == organism_db, ]

strSQL <- sprintf("Select organism_db,
organism_db_storage_path
From V_Analysis_Job_Detail_Report_2
Where Job = %s", jobRecords$Job[1])
From v_analysis_job_detail_report_2
Where job = %s", jobRecords$job[1])

con_str <- sprintf("DRIVER={%s};SERVER=gigasax;DATABASE=dms5;%s",
get_driver(),
get_auth())

con <- dbConnect(odbc(), .connection_string=con_str)
con <- get_db_connection()
qry <- dbSendQuery(con, strSQL)
res <- dbFetch(qry)
dbClearResult(qry)
dbDisconnect(con)

temp_dir <- tempdir()
temp_dir <- .new_tempdir()

# OS-specific download
if (.Platform$OS.type == "unix") {
Expand All @@ -569,6 +556,6 @@ path_to_FASTA_used_by_DMS <- function(data_package_num, organism_db = NULL)


# Prevent "no visible binding for global variable" note.
utils::globalVariables(c("Tool", "value"))
utils::globalVariables(c("tool", "value"))


Loading
Loading