Skip to content

Commit

Permalink
feat: nearest gene lut table
Browse files Browse the repository at this point in the history
  • Loading branch information
mkarmona committed Oct 1, 2018
1 parent 69073c6 commit c594000
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 21 deletions.
1 change: 1 addition & 0 deletions src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ input = "gs://genetics-portal-data"

variant-index {
path = ${output}"/variant-index/*"
nearest-genes = ${input}"/v2g/nearest_gene.tsv.gz"
}

ensembl {
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/ot/geckopipe/Configuration.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ case class IntervalSection(path: String)

case class QTLSection(path: String)

case class VariantSection(path: String)
case class VariantSection(path: String, nearestGenes: String)

case class VariantGeneSection(path: String)

Expand Down
33 changes: 13 additions & 20 deletions src/main/scala/ot/geckopipe/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,26 +60,19 @@ class Commands(val ss: SparkSession, val sampleFactor: Double, val c: Configurat
def dictionaries(): Unit = {
logger.info("exec variant-gene-luts command")

val vIdx = VariantIndex.builder(c).load

logger.info("write rs_id to chr-position")
vIdx.selectBy(Seq("rs_id", "chr_id", "position"))
.orderBy(col("rs_id").asc)
.distinct()
.write
.option("delimiter","\t")
.option("header", "false")
.csv(c.output.stripSuffix("/").concat("/v2g-lut-rsid/"))

logger.info("write gene name to chr position")
val _ = EnsemblIndex(c.ensembl.geneTranscriptPairs)
.aggByGene
.orderBy(col("gene_id").asc)
.write
.option("delimiter","\t")
.option("header", "false")
.csv(c.output.stripSuffix("/").concat("/v2g-lut-gene/"))

val vIdxBuilder = VariantIndex.builder(c)
val vIdx = vIdxBuilder.load
val nearests = vIdxBuilder.loadNearestGenes.map( df => {
logger.info("generate variant index LUT with nearest genes (prot-cod and not prot-cod")
vIdx.table.join(df, VariantIndex.variantColumnNames, "left_outer")
.write
.json(c.output.stripSuffix("/").concat("/variant-index-lut/"))
})

nearests match {
case scala.util.Success(lut) => logger.info("generated variant index LUT")
case scala.util.Failure(ex) => logger.error(ex.getMessage)
}
}

def summaryStats(): Unit = {
Expand Down
19 changes: 19 additions & 0 deletions src/main/scala/ot/geckopipe/index/VariantIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@ package ot.geckopipe.index
import com.typesafe.scalalogging.LazyLogging
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import ot.geckopipe.functions.{loadFromCSV, splitVariantID}
import ot.geckopipe.{Configuration, VEP}

import scala.util.Try

/** represents a cached table of variants with all variant columns
*
* columns as chr_id, position, ref_allele, alt_allele, variant_id, rs_id. Also
Expand All @@ -26,6 +30,13 @@ object VariantIndex {
/** types of the columns named in variantColumnNames */
val variantColumnTypes: List[String] = List("String", "long", "string", "string")

val nearestGenesSchema = StructType(
StructField("varid", StringType, false) ::
StructField("gene_id_prot_coding", StringType, false) ::
StructField("gene_id_prot_coding_distance", LongType) ::
StructField("gene_id", StringType, false) ::
StructField("gene_id_distance", LongType) :: Nil)

/** this class build based on the Configuration it creates a VariantIndex */
class Builder (val conf: Configuration, val ss: SparkSession) extends LazyLogging {
def load: VariantIndex = {
Expand All @@ -41,6 +52,14 @@ object VariantIndex {
}
}

def loadNearestGenes: Try[DataFrame] = {
splitVariantID(loadFromCSV(conf.variantIndex.nearestGenes, nearestGenesSchema)(ss),
variantColName = "varid").map(df => {
df.drop("varid", "gene_id_prot_coding_distance", "gene_id_distance")
.repartitionByRange(col("chr_id").asc, col("position").asc)
})
}

def build: VariantIndex = {
logger.info("building variant index as specified in the configuration")
val savePath = conf.variantIndex.path.stripSuffix("*")
Expand Down

0 comments on commit c594000

Please sign in to comment.