From 7dbc0364f23f7efaea9c85c4a3d81f6836bbf1fd Mon Sep 17 00:00:00 2001 From: Lining Sun Date: Fri, 29 Apr 2016 12:49:54 -0700 Subject: [PATCH] HdfsRepository supports #54 --- project/Dependencies.scala | 4 + project/IgluScalaClientBuild.scala | 5 +- .../repositories/HdfsRepositoryRef.scala | 161 ++++++++++++++++++ .../SpecHelpers.scala | 3 + .../repositories/HdfsRepositoryRefSpec.scala | 97 +++++++++++ 5 files changed, 269 insertions(+), 1 deletion(-) create mode 100644 src/main/scala/com.snowplowanalytics.iglu/client/repositories/HdfsRepositoryRef.scala create mode 100644 src/test/scala/com.snowplowanalytics.iglu.client/repositories/HdfsRepositoryRefSpec.scala diff --git a/project/Dependencies.scala b/project/Dependencies.scala index c10aa35e..c964f73f 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -30,6 +30,7 @@ object Dependencies { val specs2 = "2.3.13" // Downgrade to prevent issues in job tests. WAS: "2.3.11" val scalazSpecs2 = "0.2" val mockito = "1.10.19" + val hadoop = "2.2.0" } object Libraries { @@ -42,6 +43,9 @@ object Dependencies { val json4sScalaz = "org.json4s" %% "json4s-scalaz" % V.json4s val scalaz7 = "org.scalaz" %% "scalaz-core" % V.scalaz7 val collUtils = "com.twitter" %% "util-collection" % V.collUtils + // hadoop hdfs + val hadoopCommon = "org.apache.hadoop" % "hadoop-common" % V.hadoop + val hadoopHdfs = "org.apache.hadoop" % "hadoop-hdfs" % V.hadoop // Scala (test only) val specs2 = "org.specs2" %% "specs2" % V.specs2 % "test" val scalazSpecs2 = "org.typelevel" %% "scalaz-specs2" % V.scalazSpecs2 % "test" diff --git a/project/IgluScalaClientBuild.scala b/project/IgluScalaClientBuild.scala index c1c37ddd..2673136c 100644 --- a/project/IgluScalaClientBuild.scala +++ b/project/IgluScalaClientBuild.scala @@ -40,7 +40,10 @@ object IgluScalaClientBuild extends Build { // Scala (test only) Libraries.specs2, Libraries.scalazSpecs2, - Libraries.mockito + Libraries.mockito, + // hadoop hdfs + Libraries.hadoopCommon, + Libraries.hadoopHdfs ) ) } diff --git a/src/main/scala/com.snowplowanalytics.iglu/client/repositories/HdfsRepositoryRef.scala b/src/main/scala/com.snowplowanalytics.iglu/client/repositories/HdfsRepositoryRef.scala new file mode 100644 index 00000000..e8c8ca4e --- /dev/null +++ b/src/main/scala/com.snowplowanalytics.iglu/client/repositories/HdfsRepositoryRef.scala @@ -0,0 +1,161 @@ +/* + * Copyright (c) 2014 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.iglu.client +package repositories + +// Java +import java.io.{InputStreamReader, BufferedReader, IOException} +import java.net.URI + + +// Apache Commons +import org.apache.commons.lang3.exception.ExceptionUtils + +// Jackson +import com.fasterxml.jackson.core.JsonParseException +import com.fasterxml.jackson.databind.JsonNode +import com.github.fge.jackson.JsonLoader + +// Scalaz +import scalaz._ +import Scalaz._ + +// json4s +import org.json4s.scalaz.JsonScalaz._ +import org.json4s._ +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + +// This project +import validation.ProcessingMessageMethods +import ProcessingMessageMethods._ +import utils.{ValidationExceptions => VE} + +// Hadoop +import org.apache.hadoop.fs._; +import org.apache.hadoop.conf._; +import org.apache.hadoop.io._; +import org.apache.hadoop.util._; + +/** + * Helpers for constructing an HdfsRepository. + * See below for the definition. + */ +object HdfsRepositoryRef { + + implicit val formats = DefaultFormats + + /** + * Sniffs a config JSON to determine if this is + * an hdfs repository ref or not. + * + * @param config The configuration JSON to sniff + * @return true if this is the configuration for + * an HdfsRepositoryRef, else false + */ + def isHdfs(config: JValue): Boolean = + (config \ "connection" \ "hdfs").toSome.isDefined + + /** + * Constructs an HdfsRepositoryRef + * from a JsonNode. + * + * @param config The JSON containing the configuration + * for this repository reference + * @return a configured reference to this hdfs + * repository + */ + def parse(config: JsonNode): ValidatedNel[HdfsRepositoryRef] = + parse(fromJsonNode(config)) + + /** + * Constructs an HdfsRepositoryRef + * from a JValue. + * + * @param config The JSON containing the configuration + * for this repository reference + * @return a configured reference to this hdfs + * repository + */ + def parse(config: JValue): ValidatedNel[HdfsRepositoryRef] = { + val conf = RepositoryRefConfig.parse(config) + val path = extractPath(config) + (conf |@| path.toValidationNel) { HdfsRepositoryRef(_, _) } + } + + /** + * Returns the path to this hdfs repository. + * + * @param ref The JSON containing the configuration + * for this repository reference + * @return the path to the hdfs repository on + * Success, or an error String on Failure + */ + private def extractPath(config: JValue): Validated[String] = + try { + (config \ "connection" \ "hdfs" \ "path").extract[String].success + } catch { + case me: MappingException => s"Could not extract connection.hdfs.path from ${compact(render(config))}".fail.toProcessingMessage + } + +} + +case class HdfsRepositoryRef( + override val config: RepositoryRefConfig, + path: String) extends RepositoryRef { + + /** + * Prioritize searching this class of repository because + * it is low cost. + */ + override val classPriority: Int = 1 + + /** + * Human-readable descriptor for this + * type of repository ref. + */ + val descriptor = "hdfs" + + /** + * Retrieves an IgluSchema from the Iglu Repo as + * a JsonNode. + * + * @param schemaKey The SchemaKey uniquely identifies + * the schema in Iglu + * @return a Validation boxing either the Schema's + * JsonNode on Success, or an error String + * on Failure + */ + // TODO: would be nice to abstract out fail.toProcessingMessage, and scrubbing + def lookupSchema(schemaKey: SchemaKey): Validated[Option[JsonNode]] = { + val schemaPath = s"${path}/schemas/${schemaKey.toPath}" + try { + val pt:Path = new Path(schemaPath) + var conf = new Configuration() + if (path.startsWith("hdfs:")) { + conf.addResource(new Path("/etc/hadoop/conf/core-site.xml")) + conf.addResource(new Path("/etc/hadoop/conf/hdfs-site.xml")) + } + val fs:FileSystem = FileSystem.get(conf) + val br=new BufferedReader(new InputStreamReader(fs.open(pt))); + JsonLoader.fromReader(br).some.success + } catch { + case jpe: JsonParseException => // Child of IOException so match first + s"Problem parsing ${schemaPath} as JSON in ${descriptor} Iglu repository ${config.name}: %s".format(VE.stripInstanceEtc(jpe.getMessage)).fail.toProcessingMessage + case ioe: IOException => + None.success // Schema not found + case e: Throwable => + s"Unknown problem reading and parsing ${schemaPath} in ${descriptor} Iglu repository ${config.name}: ${VE.getThrowableMessage(e)}".fail.toProcessingMessage + } + } +} diff --git a/src/test/scala/com.snowplowanalytics.iglu.client/SpecHelpers.scala b/src/test/scala/com.snowplowanalytics.iglu.client/SpecHelpers.scala index e216327b..dfee082a 100644 --- a/src/test/scala/com.snowplowanalytics.iglu.client/SpecHelpers.scala +++ b/src/test/scala/com.snowplowanalytics.iglu.client/SpecHelpers.scala @@ -30,6 +30,7 @@ import org.json4s.jackson.JsonMethods.{ import repositories.{ EmbeddedRepositoryRef, HttpRepositoryRef, + HdfsRepositoryRef, RepositoryRefConfig } @@ -40,6 +41,8 @@ object SpecHelpers { val EmbeddedTest = EmbeddedRepositoryRef(RepositoryRefConfig("Iglu Test Embedded", 0, List("com.snowplowanalytics")), path = "/iglu-test-embedded") + val HdfsTest = + HdfsRepositoryRef(RepositoryRefConfig("Iglu Test Embedded", 0, List("com.snowplowanalytics")), path = "src/test/resources/iglu-test-embedded") val TestResolver = Resolver(cacheSize = 10, EmbeddedTest) diff --git a/src/test/scala/com.snowplowanalytics.iglu.client/repositories/HdfsRepositoryRefSpec.scala b/src/test/scala/com.snowplowanalytics.iglu.client/repositories/HdfsRepositoryRefSpec.scala new file mode 100644 index 00000000..eb5061ee --- /dev/null +++ b/src/test/scala/com.snowplowanalytics.iglu.client/repositories/HdfsRepositoryRefSpec.scala @@ -0,0 +1,97 @@ +/* + * Copyright (c) 2014 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.iglu.client +package repositories + +// Scalaz +import scalaz._ +import Scalaz._ + +// This project +import validation.ProcessingMessageMethods._ + +// Specs2 +import org.specs2.Specification +import org.specs2.matcher.DataTables +import org.specs2.scalaz.ValidationMatchers + +class HdfsRepositoryRefSpec extends Specification with DataTables with ValidationMatchers { def is = + + "This is a specification to test an hdfs RepositoryRef" ^ + p^ + "a JSON configuration for an hdfs RepositoryRef should be recognized as such" ! e1^ + "a JSON configuration can be used to construct an hdfs RepositoryRef" ! e2^ + "retrieving an existent JSON Schema from an hdfs RepositoryRef should work" ! e3^ + "requesting a non-existent JSON Schema from an hdfs RepositoryRef should return None" ! e4^ + end + + val AcmeConfig = SpecHelpers.asJValue( + """|{ + |"name": "Acme Hdfs", + |"priority": 100, + |"vendorPrefixes": [ "uk.co.acme", "de.acme" ], + |"connection": { + |"hdfs": { + |"path": "/acme-hdfs-new" + |} + |} + |}""".stripMargin.replaceAll("[\n\r]","") + ) + + def e1 = HdfsRepositoryRef.isHdfs(AcmeConfig) must beTrue + + def e2 = { + val expected = HdfsRepositoryRef( + config = RepositoryRefConfig("Acme Hdfs", 100, List("uk.co.acme", "de.acme")), + path = "/acme-hdfs-new" + ) + HdfsRepositoryRef.parse(AcmeConfig) must beSuccessful(expected) + } + + def e3 = { + val schemaKey = SchemaKey("com.snowplowanalytics.iglu-test", "stock-item", "jsonschema", "1-0-0") + val expected = + """{ + |"$schema":"http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#", + |"description":"Test schema", + |"self":{ + |"vendor":"com.snowplowanalytics.iglu-test", + |"name":"stock-item", + |"format":"jsonschema", + |"version":"1-0-0" + |}, + |"type":"object", + |"properties":{ + |"id":{ + |"type":"string" + |}, + |"name":{ + |"type":"string" + |}, + |"price":{ + |"type":"number" + |} + |}, + |"required":["id","name","price"], + |"additionalProperties":false + |}""".stripMargin.replaceAll("[\n\r]","") + + SpecHelpers.HdfsTest.lookupSchema(schemaKey).map(_.map(_.toString)) must beSuccessful(Some(expected)) + } + + def e4 = { + val schemaKey = SchemaKey("com.acme.n-a", "null", "jsonschema", "1-0-0") + SpecHelpers.HdfsTest.lookupSchema(schemaKey) must beSuccessful(None) + } + +}