Skip to content

Commit

Permalink
HdfsRepository supports snowplow#54
Browse files Browse the repository at this point in the history
  • Loading branch information
liningalex committed Apr 29, 2016
1 parent 6d3db6f commit 7dbc036
Show file tree
Hide file tree
Showing 5 changed files with 269 additions and 1 deletion.
4 changes: 4 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ object Dependencies {
val specs2 = "2.3.13" // Downgrade to prevent issues in job tests. WAS: "2.3.11"
val scalazSpecs2 = "0.2"
val mockito = "1.10.19"
val hadoop = "2.2.0"
}

object Libraries {
Expand All @@ -42,6 +43,9 @@ object Dependencies {
val json4sScalaz = "org.json4s" %% "json4s-scalaz" % V.json4s
val scalaz7 = "org.scalaz" %% "scalaz-core" % V.scalaz7
val collUtils = "com.twitter" %% "util-collection" % V.collUtils
// hadoop hdfs
val hadoopCommon = "org.apache.hadoop" % "hadoop-common" % V.hadoop
val hadoopHdfs = "org.apache.hadoop" % "hadoop-hdfs" % V.hadoop
// Scala (test only)
val specs2 = "org.specs2" %% "specs2" % V.specs2 % "test"
val scalazSpecs2 = "org.typelevel" %% "scalaz-specs2" % V.scalazSpecs2 % "test"
Expand Down
5 changes: 4 additions & 1 deletion project/IgluScalaClientBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@ object IgluScalaClientBuild extends Build {
// Scala (test only)
Libraries.specs2,
Libraries.scalazSpecs2,
Libraries.mockito
Libraries.mockito,
// hadoop hdfs
Libraries.hadoopCommon,
Libraries.hadoopHdfs
)
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Copyright (c) 2014 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.iglu.client
package repositories

// Java
import java.io.{InputStreamReader, BufferedReader, IOException}
import java.net.URI


// Apache Commons
import org.apache.commons.lang3.exception.ExceptionUtils

// Jackson
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.databind.JsonNode
import com.github.fge.jackson.JsonLoader

// Scalaz
import scalaz._
import Scalaz._

// json4s
import org.json4s.scalaz.JsonScalaz._
import org.json4s._
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

// This project
import validation.ProcessingMessageMethods
import ProcessingMessageMethods._
import utils.{ValidationExceptions => VE}

// Hadoop
import org.apache.hadoop.fs._;
import org.apache.hadoop.conf._;
import org.apache.hadoop.io._;
import org.apache.hadoop.util._;

/**
* Helpers for constructing an HdfsRepository.
* See below for the definition.
*/
object HdfsRepositoryRef {

implicit val formats = DefaultFormats

/**
* Sniffs a config JSON to determine if this is
* an hdfs repository ref or not.
*
* @param config The configuration JSON to sniff
* @return true if this is the configuration for
* an HdfsRepositoryRef, else false
*/
def isHdfs(config: JValue): Boolean =
(config \ "connection" \ "hdfs").toSome.isDefined

/**
* Constructs an HdfsRepositoryRef
* from a JsonNode.
*
* @param config The JSON containing the configuration
* for this repository reference
* @return a configured reference to this hdfs
* repository
*/
def parse(config: JsonNode): ValidatedNel[HdfsRepositoryRef] =
parse(fromJsonNode(config))

/**
* Constructs an HdfsRepositoryRef
* from a JValue.
*
* @param config The JSON containing the configuration
* for this repository reference
* @return a configured reference to this hdfs
* repository
*/
def parse(config: JValue): ValidatedNel[HdfsRepositoryRef] = {
val conf = RepositoryRefConfig.parse(config)
val path = extractPath(config)
(conf |@| path.toValidationNel) { HdfsRepositoryRef(_, _) }
}

/**
* Returns the path to this hdfs repository.
*
* @param ref The JSON containing the configuration
* for this repository reference
* @return the path to the hdfs repository on
* Success, or an error String on Failure
*/
private def extractPath(config: JValue): Validated[String] =
try {
(config \ "connection" \ "hdfs" \ "path").extract[String].success
} catch {
case me: MappingException => s"Could not extract connection.hdfs.path from ${compact(render(config))}".fail.toProcessingMessage
}

}

case class HdfsRepositoryRef(
override val config: RepositoryRefConfig,
path: String) extends RepositoryRef {

/**
* Prioritize searching this class of repository because
* it is low cost.
*/
override val classPriority: Int = 1

/**
* Human-readable descriptor for this
* type of repository ref.
*/
val descriptor = "hdfs"

/**
* Retrieves an IgluSchema from the Iglu Repo as
* a JsonNode.
*
* @param schemaKey The SchemaKey uniquely identifies
* the schema in Iglu
* @return a Validation boxing either the Schema's
* JsonNode on Success, or an error String
* on Failure
*/
// TODO: would be nice to abstract out fail.toProcessingMessage, and scrubbing
def lookupSchema(schemaKey: SchemaKey): Validated[Option[JsonNode]] = {
val schemaPath = s"${path}/schemas/${schemaKey.toPath}"
try {
val pt:Path = new Path(schemaPath)
var conf = new Configuration()
if (path.startsWith("hdfs:")) {
conf.addResource(new Path("/etc/hadoop/conf/core-site.xml"))
conf.addResource(new Path("/etc/hadoop/conf/hdfs-site.xml"))
}
val fs:FileSystem = FileSystem.get(conf)
val br=new BufferedReader(new InputStreamReader(fs.open(pt)));
JsonLoader.fromReader(br).some.success
} catch {
case jpe: JsonParseException => // Child of IOException so match first
s"Problem parsing ${schemaPath} as JSON in ${descriptor} Iglu repository ${config.name}: %s".format(VE.stripInstanceEtc(jpe.getMessage)).fail.toProcessingMessage
case ioe: IOException =>
None.success // Schema not found
case e: Throwable =>
s"Unknown problem reading and parsing ${schemaPath} in ${descriptor} Iglu repository ${config.name}: ${VE.getThrowableMessage(e)}".fail.toProcessingMessage
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.json4s.jackson.JsonMethods.{
import repositories.{
EmbeddedRepositoryRef,
HttpRepositoryRef,
HdfsRepositoryRef,
RepositoryRefConfig
}

Expand All @@ -40,6 +41,8 @@ object SpecHelpers {

val EmbeddedTest =
EmbeddedRepositoryRef(RepositoryRefConfig("Iglu Test Embedded", 0, List("com.snowplowanalytics")), path = "/iglu-test-embedded")
val HdfsTest =
HdfsRepositoryRef(RepositoryRefConfig("Iglu Test Embedded", 0, List("com.snowplowanalytics")), path = "src/test/resources/iglu-test-embedded")

val TestResolver = Resolver(cacheSize = 10, EmbeddedTest)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Copyright (c) 2014 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and limitations there under.
*/
package com.snowplowanalytics.iglu.client
package repositories

// Scalaz
import scalaz._
import Scalaz._

// This project
import validation.ProcessingMessageMethods._

// Specs2
import org.specs2.Specification
import org.specs2.matcher.DataTables
import org.specs2.scalaz.ValidationMatchers

class HdfsRepositoryRefSpec extends Specification with DataTables with ValidationMatchers { def is =

"This is a specification to test an hdfs RepositoryRef" ^
p^
"a JSON configuration for an hdfs RepositoryRef should be recognized as such" ! e1^
"a JSON configuration can be used to construct an hdfs RepositoryRef" ! e2^
"retrieving an existent JSON Schema from an hdfs RepositoryRef should work" ! e3^
"requesting a non-existent JSON Schema from an hdfs RepositoryRef should return None" ! e4^
end

val AcmeConfig = SpecHelpers.asJValue(
"""|{
|"name": "Acme Hdfs",
|"priority": 100,
|"vendorPrefixes": [ "uk.co.acme", "de.acme" ],
|"connection": {
|"hdfs": {
|"path": "/acme-hdfs-new"
|}
|}
|}""".stripMargin.replaceAll("[\n\r]","")
)

def e1 = HdfsRepositoryRef.isHdfs(AcmeConfig) must beTrue

def e2 = {
val expected = HdfsRepositoryRef(
config = RepositoryRefConfig("Acme Hdfs", 100, List("uk.co.acme", "de.acme")),
path = "/acme-hdfs-new"
)
HdfsRepositoryRef.parse(AcmeConfig) must beSuccessful(expected)
}

def e3 = {
val schemaKey = SchemaKey("com.snowplowanalytics.iglu-test", "stock-item", "jsonschema", "1-0-0")
val expected =
"""{
|"$schema":"http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#",
|"description":"Test schema",
|"self":{
|"vendor":"com.snowplowanalytics.iglu-test",
|"name":"stock-item",
|"format":"jsonschema",
|"version":"1-0-0"
|},
|"type":"object",
|"properties":{
|"id":{
|"type":"string"
|},
|"name":{
|"type":"string"
|},
|"price":{
|"type":"number"
|}
|},
|"required":["id","name","price"],
|"additionalProperties":false
|}""".stripMargin.replaceAll("[\n\r]","")

SpecHelpers.HdfsTest.lookupSchema(schemaKey).map(_.map(_.toString)) must beSuccessful(Some(expected))
}

def e4 = {
val schemaKey = SchemaKey("com.acme.n-a", "null", "jsonschema", "1-0-0")
SpecHelpers.HdfsTest.lookupSchema(schemaKey) must beSuccessful(None)
}

}

0 comments on commit 7dbc036

Please sign in to comment.