diff --git a/src/main/resources/configs/deduplication_annotation.xml b/src/main/resources/configs/deduplication_annotation.xml
index 6e891fd8..e98eb892 100644
--- a/src/main/resources/configs/deduplication_annotation.xml
+++ b/src/main/resources/configs/deduplication_annotation.xml
@@ -12,7 +12,7 @@
true
500000
1
- 0.7
+ 0.5
diff --git a/src/main/scala/de/hpi/ingestion/dataimport/JSONParser.scala b/src/main/scala/de/hpi/ingestion/dataimport/JSONParser.scala
index bc4d9e2f..a381e205 100644
--- a/src/main/scala/de/hpi/ingestion/dataimport/JSONParser.scala
+++ b/src/main/scala/de/hpi/ingestion/dataimport/JSONParser.scala
@@ -145,6 +145,7 @@ trait JSONParser {
}
object JSONParser {
+ // scalastyle:off cyclomatic.complexity
def toJson[T](data: T): JsValue = data match {
case x: JsValue => x
case x: String => this(x)
@@ -153,9 +154,11 @@ object JSONParser {
case x: Boolean => this(x)
case x: Subject => this(x)
case x: List[Any] => this(x)
- case x: Map[Any, Any @unchecked] => this(x)
+ case x: Map[Any, Any] @unchecked => this(x)
+ case null => JsNull
case x => this(x.toString)
}
+ // scalastyle:on cyclomatic.complexity
def apply(data: String): JsValue = JsString(data)
def apply(data: Int): JsValue = JsNumber(data)
diff --git a/src/main/scala/de/hpi/ingestion/deduplication/Deduplication.scala b/src/main/scala/de/hpi/ingestion/deduplication/Deduplication.scala
index 86aa8ecf..ff08653c 100644
--- a/src/main/scala/de/hpi/ingestion/deduplication/Deduplication.scala
+++ b/src/main/scala/de/hpi/ingestion/deduplication/Deduplication.scala
@@ -17,12 +17,10 @@ limitations under the License.
package de.hpi.ingestion.deduplication
import com.datastax.spark.connector._
-import de.hpi.ingestion.dataimport.JSONParser
import de.hpi.ingestion.datalake.models._
import de.hpi.ingestion.deduplication.blockingschemes._
import de.hpi.ingestion.deduplication.models._
import de.hpi.ingestion.deduplication.models.config.AttributeConfig
-import de.hpi.ingestion.deduplication.similarity.SimilarityMeasure
import de.hpi.ingestion.framework.SparkJob
import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast
@@ -48,7 +46,6 @@ class Deduplication extends SparkJob {
var subjects: RDD[Subject] = _
var stagedSubjects: RDD[Subject] = _
var duplicates: RDD[Duplicates] = _
- var duplicates2: RDD[String] = _
// $COVERAGE-OFF$
/**
@@ -65,8 +62,7 @@ class Deduplication extends SparkJob {
* @param sc SparkContext to be used for the job
*/
override def save(sc: SparkContext): Unit = {
-// duplicates.saveToCassandra(settings("keyspaceDuplicatesTable"), settings("duplicatesTable"))
- duplicates2.saveAsTextFile(s"deduplication_training_data_${System.currentTimeMillis()}")
+ duplicates.saveToCassandra(settings("keyspaceDuplicatesTable"), settings("duplicatesTable"))
}
// $COVERAGE-ON$
@@ -80,7 +76,7 @@ class Deduplication extends SparkJob {
settings.get("minBlockSize").foreach(blocking.setMinBlockSize)
settings.get("filterUndefined").foreach(blocking.setFilterUndefined)
settings.get("filterSmall").foreach(blocking.setFilterSmall)
- blocking.subjects = subjects//.filter(_.isSlave)
+ blocking.subjects = subjects.filter(_.isSlave)
blocking.stagedSubjects = stagedSubjects
blocking.setPartitioning(sc)
blocking.subjectReductionFunction = (subject: Subject) => {
@@ -95,14 +91,9 @@ class Deduplication extends SparkJob {
// TODO: set blocking schemes
val blocks = blocking.blocking().values
- val unnormalizedScoreConfig = scoreConfigSettings
- .map { aConf =>
- val scoreConf = aConf.scoreConfigs.map(_.copy[String, SimilarityMeasure[String]](weight = 1.0))
- aConf.copy(scoreConfigs = scoreConf)
- }
- val scoreConfigBroadcast = sc.broadcast(unnormalizedScoreConfig)
- val subjectPairs = findDuplicates2(blocks, settings("confidence").toDouble, scoreConfigBroadcast)
- duplicates2 = createDuplicates2(subjectPairs, settings("stagingTable"))
+ val scoreConfigBroadcast = sc.broadcast(scoreConfigSettings)
+ val subjectPairs = findDuplicates(blocks, settings("confidence").toDouble, scoreConfigBroadcast)
+ duplicates = createDuplicates(subjectPairs, settings("stagingTable"))
}
}
@@ -129,22 +120,6 @@ object Deduplication {
scores.sum / scores.length
}
- def compare2(
- subject1: Subject,
- subject2: Subject,
- attributeConfigs: List[AttributeConfig] = Nil,
- scale: Int = 1
- ): (Double, List[Double]) = {
- val scores = for {
- AttributeConfig(attribute, weight, configs) <- attributeConfigs
- subjectValues = subject1.get(attribute)
- stagingValues = subject2.get(attribute)
- if subjectValues.nonEmpty && stagingValues.nonEmpty
- config <- configs
- } yield CompareStrategy(attribute)(subjectValues, stagingValues, config) * weight
- (scores.sum / scores.length, scores)
- }
-
/**
* Groups all found duplicates by the Subject whose duplicate they are and creates the corresponding
* DuplicateCandidates.
@@ -163,40 +138,6 @@ object Deduplication {
.map { case ((id, name), candidates) => Duplicates(id, name, stagingTable, candidates.distinct) }
}
- def createDuplicates2(
- subjectPairs: RDD[(Subject, Subject, List[Double], Double)],
- stagingTable: String
- ): RDD[String] = {
- subjectPairs
- .map { case (subject, staging, scores, mean) =>
- (staging.id, List((subject, staging, scores, mean)))
- }.reduceByKey(_ ::: _)
- .map { case (stagingId, subjectPairs) =>
- val staging = subjectPairs.head._2
- val subjects = subjectPairs.map { case (subject, staging, scores, mean) =>
- Map(
- "mean" -> JSONParser(mean),
- "scores" -> JSONParser(scores),
- "subject" -> JSONParser(subject)
-// "mean" -> mean,
-// "scores" -> scores,
-// "subject" -> subject
- )
- }
- val duplicateData = Map(
- "staging" -> JSONParser(staging),
- "staging_source_table" -> JSONParser(stagingTable),
- "subject_source_table" -> JSONParser("implisense"),
- "subjects" -> JSONParser(subjects)
-// "staging" -> staging,
-// "staging_source_table" -> stagingTable,
-// "subject_source_table" -> "subject_implisense",
-// "subjects" -> subjects
- )
- JSONParser(duplicateData).toString
- }
- }
-
/**
* Finds the duplicates of each block by comparing the Subjects and filtering all Subjects pairs below the
* threshold confidence.
@@ -210,32 +151,12 @@ object Deduplication {
confidence: Double,
scoreConfigBroadcast: Broadcast[List[AttributeConfig]]
): RDD[(Subject, Subject, Double)] = {
- blocks.flatMap { block =>
- val filterFunction = (s1: Subject, s2: Subject) =>
- compare(s1, s2, scoreConfigBroadcast.value) >= confidence
- block
- .crossProduct(filterFunction)
- .map { case (subject1, subject2) =>
- val score = compare(subject1, subject2, scoreConfigBroadcast.value)
- (subject1, subject2, score)
- }
- }
- }
-
- def findDuplicates2(
- blocks: RDD[Block],
- confidence: Double,
- scoreConfigBroadcast: Broadcast[List[AttributeConfig]]
- ): RDD[(Subject, Subject, List[Double], Double)] = {
- blocks.flatMap { block =>
- val filterFunction = (s1: Subject, s2: Subject) =>
- compare(s1, s2, scoreConfigBroadcast.value) >= confidence
- block
- .crossProduct(filterFunction)
- .map { case (subject1, subject2) =>
- val (mean, scores) = compare2(subject1, subject2, scoreConfigBroadcast.value)
- (subject1, subject2, scores, mean)
- }
- }
+ blocks
+ .flatMap(_.crossProduct())
+ .map { case (subject1, subject2) =>
+ val score = compare(subject1, subject2, scoreConfigBroadcast.value)
+ (subject1, subject2, score)
+ }.filter(_._3 >= confidence)
+ .repartition(64)
}
}
diff --git a/src/main/scala/de/hpi/ingestion/deduplication/DeduplicationCandidateExport.scala b/src/main/scala/de/hpi/ingestion/deduplication/DeduplicationCandidateExport.scala
new file mode 100644
index 00000000..a6032fc8
--- /dev/null
+++ b/src/main/scala/de/hpi/ingestion/deduplication/DeduplicationCandidateExport.scala
@@ -0,0 +1,185 @@
+/*
+Copyright 2016-17, Hasso-Plattner-Institut fuer Softwaresystemtechnik GmbH
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package de.hpi.ingestion.deduplication
+
+import com.datastax.spark.connector._
+import de.hpi.ingestion.dataimport.JSONParser
+import de.hpi.ingestion.datalake.models.Subject
+import de.hpi.ingestion.deduplication.blockingschemes._
+import de.hpi.ingestion.deduplication.models.Block
+import de.hpi.ingestion.deduplication.models.config.AttributeConfig
+import de.hpi.ingestion.deduplication.similarity.SimilarityMeasure
+import de.hpi.ingestion.framework.SparkJob
+import org.apache.spark.SparkContext
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.rdd.RDD
+
+class DeduplicationCandidateExport extends SparkJob {
+ sparkOptions("spark.yarn.executor.memoryOverhead") = "8192"
+
+ appName = "Deduplication Candidate Export"
+ configFile = "deduplication_annotation.xml"
+ val blockingSchemes = List[BlockingScheme](
+ SimpleBlockingScheme("simple_scheme"),
+ LastLettersBlockingScheme("lastLetter_scheme"),
+ MappedListBlockingScheme("mappedSectors_scheme", x => x.slice(0, 4), "gen_sectors"),
+ MappedListBlockingScheme("mappedPostal_scheme", x => x.slice(0, 3), "geo_postal")
+ )
+
+ var subjects: RDD[Subject] = _
+ var stagedSubjects: RDD[Subject] = _
+ var duplicatesJson: RDD[String] = _
+
+ // $COVERAGE-OFF$
+ /**
+ * Loads the Subjects and the staged Subjects from the Cassandra.
+ * @param sc SparkContext to be used for the job
+ */
+ override def load(sc: SparkContext): Unit = {
+ subjects = sc.cassandraTable[Subject](settings("keyspaceSubjectTable"), settings("subjectTable"))
+ stagedSubjects = sc.cassandraTable[Subject](settings("keyspaceStagingTable"), settings("stagingTable"))
+ }
+
+ /**
+ * Saves the duplicates to the Cassandra.
+ * @param sc SparkContext to be used for the job
+ */
+ override def save(sc: SparkContext): Unit = {
+ duplicatesJson.saveAsTextFile(s"deduplication_training_data_${System.currentTimeMillis()}")
+ }
+ // $COVERAGE-ON$
+
+ /**
+ * Blocks the Subjects and finds duplicates between them between the Subjects and staged Subjects.
+ * @param sc Spark Context used to e.g. broadcast variables
+ */
+ override def run(sc: SparkContext): Unit = {
+ val blocking = new Blocking
+ settings.get("maxBlockSize").foreach(blocking.setMaxBlockSize)
+ settings.get("minBlockSize").foreach(blocking.setMinBlockSize)
+ settings.get("filterUndefined").foreach(blocking.setFilterUndefined)
+ settings.get("filterSmall").foreach(blocking.setFilterSmall)
+ val comparingWithMasterSubjects = settings("subjectTable") == "subject"
+ blocking.subjects = subjects.filter(_.isSlave || !comparingWithMasterSubjects)
+ blocking.stagedSubjects = stagedSubjects
+ blocking.setPartitioning(sc)
+ blocking.subjectReductionFunction = (subject: Subject) => {
+ subject.master_history = Nil
+ subject.name_history = Nil
+ subject.category_history = Nil
+ subject.properties_history = Map()
+ subject.relations_history = Map()
+ subject.aliases_history = Nil
+ subject
+ }
+
+ // TODO: set blocking schemes
+ val blocks = blocking.blocking().values
+ val unnormalizedScoreConfig = scoreConfigSettings
+ .map { aConf =>
+ val scoreConf = aConf.scoreConfigs.map(_.copy[String, SimilarityMeasure[String]](weight = 1.0))
+ aConf.copy(scoreConfigs = scoreConf, weight = 1.0)
+ }
+ val scoreConfigBroadcast = sc.broadcast(unnormalizedScoreConfig)
+ val subjectPairs = findDuplicates(blocks, settings("confidence").toDouble, scoreConfigBroadcast)
+ duplicatesJson = createDuplicates(subjectPairs, settings("stagingTable"), settings("subjectTable"))
+ }
+
+ /**
+ * Compares two subjects according to the configuration and returns the scores for each similarity measure as well
+ * as the mean of those scores.
+ * @param subject1 the first Subject of the comparison
+ * @param subject2 the second Subject of the comparison
+ * @return tuple of the mean of the similarity scores and a list of the scores
+ */
+ def compare(
+ subject1: Subject,
+ subject2: Subject,
+ attributeConfigs: List[AttributeConfig] = Nil,
+ scale: Int = 1
+ ): (Double, List[Double]) = {
+ val scores = for {
+ AttributeConfig(attribute, weight, configs) <- attributeConfigs
+ subjectValues = subject1.get(attribute)
+ stagingValues = subject2.get(attribute)
+ if subjectValues.nonEmpty && stagingValues.nonEmpty
+ config <- configs
+ } yield CompareStrategy(attribute)(subjectValues, stagingValues, config) * weight
+ (scores.sum / scores.length, scores)
+ }
+
+ /**
+ * Groups all found duplicates by the staged Subjects. These groups are then serialized into JSON.
+ * @param subjectPairs RDD containing all found duplicates and their scores
+ * @param sourceTable source table of the existing Subjects
+ * @param stagingTable source table of the staged Subjects
+ * @return RDD serialized duplicate groups
+ */
+ def createDuplicates(
+ subjectPairs: RDD[(Subject, Subject, List[Double], Double)],
+ sourceTable: String,
+ stagingTable: String
+ ): RDD[String] = {
+ subjectPairs
+ .map { case (subject, staging, scores, mean) =>
+ (staging.id, List((subject, staging, scores, mean)))
+ }.reduceByKey(_ ::: _)
+ .values
+ .map { duplicateList =>
+ val staging = duplicateList.head._2
+ val subjects = duplicateList
+ .sortBy { case (subject, staging, scores, mean) => -mean }
+ .take(10)
+ .map { case (subject, staging, scores, mean) =>
+ Map(
+ "mean" -> mean,
+ "scores" -> scores,
+ "subject" -> subject
+ )
+ }
+ val duplicateData = Map(
+ "staging" -> staging,
+ "staging_source_table" -> stagingTable,
+ "subject_source_table" -> sourceTable,
+ "subjects" -> subjects
+ )
+ JSONParser(duplicateData).toString
+ }
+ }
+
+ /**
+ * Finds the duplicates of each block by comparing the Subjects and filtering all Subjects pairs below the
+ * threshold confidence. The comparison scores and their mean are added to all Subject pairs that are kept.
+ * @param blocks RDD of Blocks that produced by the Blocking
+ * @param confidence threshold used to filter duplicate candidates
+ * @param scoreConfigBroadcast Broadcast of the score config
+ * @return tuple of Subjects with the scores of the similarity measures and the mean of those
+ */
+ def findDuplicates(
+ blocks: RDD[Block],
+ confidence: Double,
+ scoreConfigBroadcast: Broadcast[List[AttributeConfig]]
+ ): RDD[(Subject, Subject, List[Double], Double)] = {
+ blocks
+ .flatMap(_.crossProduct())
+ .map { case (subject1, subject2) =>
+ val (mean, scores) = compare(subject1, subject2, scoreConfigBroadcast.value)
+ (subject1, subject2, scores, mean)
+ }.filter(_._4 >= confidence)
+ .repartition(64)
+ }
+}
diff --git a/src/test/scala/de/hpi/ingestion/deduplication/DeduplicationCandidateExportTest.scala b/src/test/scala/de/hpi/ingestion/deduplication/DeduplicationCandidateExportTest.scala
new file mode 100644
index 00000000..f4e798dd
--- /dev/null
+++ b/src/test/scala/de/hpi/ingestion/deduplication/DeduplicationCandidateExportTest.scala
@@ -0,0 +1,31 @@
+/*
+Copyright 2016-17, Hasso-Plattner-Institut fuer Softwaresystemtechnik GmbH
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package de.hpi.ingestion.deduplication
+
+import com.holdenkarau.spark.testing.SharedSparkContext
+import org.scalatest.{FlatSpec, Matchers}
+
+class DeduplicationCandidateExportTest extends FlatSpec with SharedSparkContext with Matchers {
+ "Candidates" should "be exported" in {
+ val job = new DeduplicationCandidateExport
+ job.subjects = sc.parallelize(TestData.subjects)
+ job.stagedSubjects = sc.parallelize(TestData.stagings)
+ job.run(sc)
+ val duplicates = job.duplicatesJson.collect.toSet
+ duplicates should have size 2
+ }
+}