From 7233023d8c59d0b9f9aad7e0cdd35b3d99e6c69f Mon Sep 17 00:00:00 2001 From: Jan Ehmueller Date: Thu, 23 Aug 2018 14:52:10 +0200 Subject: [PATCH] Refs #646: finish annotation export job --- .../configs/deduplication_annotation.xml | 2 +- .../hpi/ingestion/dataimport/JSONParser.scala | 5 +- .../deduplication/Deduplication.scala | 103 ++-------- .../DeduplicationCandidateExport.scala | 185 ++++++++++++++++++ .../DeduplicationCandidateExportTest.scala | 31 +++ 5 files changed, 233 insertions(+), 93 deletions(-) create mode 100644 src/main/scala/de/hpi/ingestion/deduplication/DeduplicationCandidateExport.scala create mode 100644 src/test/scala/de/hpi/ingestion/deduplication/DeduplicationCandidateExportTest.scala diff --git a/src/main/resources/configs/deduplication_annotation.xml b/src/main/resources/configs/deduplication_annotation.xml index 6e891fd8..e98eb892 100644 --- a/src/main/resources/configs/deduplication_annotation.xml +++ b/src/main/resources/configs/deduplication_annotation.xml @@ -12,7 +12,7 @@ true 500000 1 - 0.7 + 0.5 diff --git a/src/main/scala/de/hpi/ingestion/dataimport/JSONParser.scala b/src/main/scala/de/hpi/ingestion/dataimport/JSONParser.scala index bc4d9e2f..a381e205 100644 --- a/src/main/scala/de/hpi/ingestion/dataimport/JSONParser.scala +++ b/src/main/scala/de/hpi/ingestion/dataimport/JSONParser.scala @@ -145,6 +145,7 @@ trait JSONParser { } object JSONParser { + // scalastyle:off cyclomatic.complexity def toJson[T](data: T): JsValue = data match { case x: JsValue => x case x: String => this(x) @@ -153,9 +154,11 @@ object JSONParser { case x: Boolean => this(x) case x: Subject => this(x) case x: List[Any] => this(x) - case x: Map[Any, Any @unchecked] => this(x) + case x: Map[Any, Any] @unchecked => this(x) + case null => JsNull case x => this(x.toString) } + // scalastyle:on cyclomatic.complexity def apply(data: String): JsValue = JsString(data) def apply(data: Int): JsValue = JsNumber(data) diff --git a/src/main/scala/de/hpi/ingestion/deduplication/Deduplication.scala b/src/main/scala/de/hpi/ingestion/deduplication/Deduplication.scala index 86aa8ecf..ff08653c 100644 --- a/src/main/scala/de/hpi/ingestion/deduplication/Deduplication.scala +++ b/src/main/scala/de/hpi/ingestion/deduplication/Deduplication.scala @@ -17,12 +17,10 @@ limitations under the License. package de.hpi.ingestion.deduplication import com.datastax.spark.connector._ -import de.hpi.ingestion.dataimport.JSONParser import de.hpi.ingestion.datalake.models._ import de.hpi.ingestion.deduplication.blockingschemes._ import de.hpi.ingestion.deduplication.models._ import de.hpi.ingestion.deduplication.models.config.AttributeConfig -import de.hpi.ingestion.deduplication.similarity.SimilarityMeasure import de.hpi.ingestion.framework.SparkJob import org.apache.spark.SparkContext import org.apache.spark.broadcast.Broadcast @@ -48,7 +46,6 @@ class Deduplication extends SparkJob { var subjects: RDD[Subject] = _ var stagedSubjects: RDD[Subject] = _ var duplicates: RDD[Duplicates] = _ - var duplicates2: RDD[String] = _ // $COVERAGE-OFF$ /** @@ -65,8 +62,7 @@ class Deduplication extends SparkJob { * @param sc SparkContext to be used for the job */ override def save(sc: SparkContext): Unit = { -// duplicates.saveToCassandra(settings("keyspaceDuplicatesTable"), settings("duplicatesTable")) - duplicates2.saveAsTextFile(s"deduplication_training_data_${System.currentTimeMillis()}") + duplicates.saveToCassandra(settings("keyspaceDuplicatesTable"), settings("duplicatesTable")) } // $COVERAGE-ON$ @@ -80,7 +76,7 @@ class Deduplication extends SparkJob { settings.get("minBlockSize").foreach(blocking.setMinBlockSize) settings.get("filterUndefined").foreach(blocking.setFilterUndefined) settings.get("filterSmall").foreach(blocking.setFilterSmall) - blocking.subjects = subjects//.filter(_.isSlave) + blocking.subjects = subjects.filter(_.isSlave) blocking.stagedSubjects = stagedSubjects blocking.setPartitioning(sc) blocking.subjectReductionFunction = (subject: Subject) => { @@ -95,14 +91,9 @@ class Deduplication extends SparkJob { // TODO: set blocking schemes val blocks = blocking.blocking().values - val unnormalizedScoreConfig = scoreConfigSettings - .map { aConf => - val scoreConf = aConf.scoreConfigs.map(_.copy[String, SimilarityMeasure[String]](weight = 1.0)) - aConf.copy(scoreConfigs = scoreConf) - } - val scoreConfigBroadcast = sc.broadcast(unnormalizedScoreConfig) - val subjectPairs = findDuplicates2(blocks, settings("confidence").toDouble, scoreConfigBroadcast) - duplicates2 = createDuplicates2(subjectPairs, settings("stagingTable")) + val scoreConfigBroadcast = sc.broadcast(scoreConfigSettings) + val subjectPairs = findDuplicates(blocks, settings("confidence").toDouble, scoreConfigBroadcast) + duplicates = createDuplicates(subjectPairs, settings("stagingTable")) } } @@ -129,22 +120,6 @@ object Deduplication { scores.sum / scores.length } - def compare2( - subject1: Subject, - subject2: Subject, - attributeConfigs: List[AttributeConfig] = Nil, - scale: Int = 1 - ): (Double, List[Double]) = { - val scores = for { - AttributeConfig(attribute, weight, configs) <- attributeConfigs - subjectValues = subject1.get(attribute) - stagingValues = subject2.get(attribute) - if subjectValues.nonEmpty && stagingValues.nonEmpty - config <- configs - } yield CompareStrategy(attribute)(subjectValues, stagingValues, config) * weight - (scores.sum / scores.length, scores) - } - /** * Groups all found duplicates by the Subject whose duplicate they are and creates the corresponding * DuplicateCandidates. @@ -163,40 +138,6 @@ object Deduplication { .map { case ((id, name), candidates) => Duplicates(id, name, stagingTable, candidates.distinct) } } - def createDuplicates2( - subjectPairs: RDD[(Subject, Subject, List[Double], Double)], - stagingTable: String - ): RDD[String] = { - subjectPairs - .map { case (subject, staging, scores, mean) => - (staging.id, List((subject, staging, scores, mean))) - }.reduceByKey(_ ::: _) - .map { case (stagingId, subjectPairs) => - val staging = subjectPairs.head._2 - val subjects = subjectPairs.map { case (subject, staging, scores, mean) => - Map( - "mean" -> JSONParser(mean), - "scores" -> JSONParser(scores), - "subject" -> JSONParser(subject) -// "mean" -> mean, -// "scores" -> scores, -// "subject" -> subject - ) - } - val duplicateData = Map( - "staging" -> JSONParser(staging), - "staging_source_table" -> JSONParser(stagingTable), - "subject_source_table" -> JSONParser("implisense"), - "subjects" -> JSONParser(subjects) -// "staging" -> staging, -// "staging_source_table" -> stagingTable, -// "subject_source_table" -> "subject_implisense", -// "subjects" -> subjects - ) - JSONParser(duplicateData).toString - } - } - /** * Finds the duplicates of each block by comparing the Subjects and filtering all Subjects pairs below the * threshold confidence. @@ -210,32 +151,12 @@ object Deduplication { confidence: Double, scoreConfigBroadcast: Broadcast[List[AttributeConfig]] ): RDD[(Subject, Subject, Double)] = { - blocks.flatMap { block => - val filterFunction = (s1: Subject, s2: Subject) => - compare(s1, s2, scoreConfigBroadcast.value) >= confidence - block - .crossProduct(filterFunction) - .map { case (subject1, subject2) => - val score = compare(subject1, subject2, scoreConfigBroadcast.value) - (subject1, subject2, score) - } - } - } - - def findDuplicates2( - blocks: RDD[Block], - confidence: Double, - scoreConfigBroadcast: Broadcast[List[AttributeConfig]] - ): RDD[(Subject, Subject, List[Double], Double)] = { - blocks.flatMap { block => - val filterFunction = (s1: Subject, s2: Subject) => - compare(s1, s2, scoreConfigBroadcast.value) >= confidence - block - .crossProduct(filterFunction) - .map { case (subject1, subject2) => - val (mean, scores) = compare2(subject1, subject2, scoreConfigBroadcast.value) - (subject1, subject2, scores, mean) - } - } + blocks + .flatMap(_.crossProduct()) + .map { case (subject1, subject2) => + val score = compare(subject1, subject2, scoreConfigBroadcast.value) + (subject1, subject2, score) + }.filter(_._3 >= confidence) + .repartition(64) } } diff --git a/src/main/scala/de/hpi/ingestion/deduplication/DeduplicationCandidateExport.scala b/src/main/scala/de/hpi/ingestion/deduplication/DeduplicationCandidateExport.scala new file mode 100644 index 00000000..a6032fc8 --- /dev/null +++ b/src/main/scala/de/hpi/ingestion/deduplication/DeduplicationCandidateExport.scala @@ -0,0 +1,185 @@ +/* +Copyright 2016-17, Hasso-Plattner-Institut fuer Softwaresystemtechnik GmbH + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package de.hpi.ingestion.deduplication + +import com.datastax.spark.connector._ +import de.hpi.ingestion.dataimport.JSONParser +import de.hpi.ingestion.datalake.models.Subject +import de.hpi.ingestion.deduplication.blockingschemes._ +import de.hpi.ingestion.deduplication.models.Block +import de.hpi.ingestion.deduplication.models.config.AttributeConfig +import de.hpi.ingestion.deduplication.similarity.SimilarityMeasure +import de.hpi.ingestion.framework.SparkJob +import org.apache.spark.SparkContext +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.RDD + +class DeduplicationCandidateExport extends SparkJob { + sparkOptions("spark.yarn.executor.memoryOverhead") = "8192" + + appName = "Deduplication Candidate Export" + configFile = "deduplication_annotation.xml" + val blockingSchemes = List[BlockingScheme]( + SimpleBlockingScheme("simple_scheme"), + LastLettersBlockingScheme("lastLetter_scheme"), + MappedListBlockingScheme("mappedSectors_scheme", x => x.slice(0, 4), "gen_sectors"), + MappedListBlockingScheme("mappedPostal_scheme", x => x.slice(0, 3), "geo_postal") + ) + + var subjects: RDD[Subject] = _ + var stagedSubjects: RDD[Subject] = _ + var duplicatesJson: RDD[String] = _ + + // $COVERAGE-OFF$ + /** + * Loads the Subjects and the staged Subjects from the Cassandra. + * @param sc SparkContext to be used for the job + */ + override def load(sc: SparkContext): Unit = { + subjects = sc.cassandraTable[Subject](settings("keyspaceSubjectTable"), settings("subjectTable")) + stagedSubjects = sc.cassandraTable[Subject](settings("keyspaceStagingTable"), settings("stagingTable")) + } + + /** + * Saves the duplicates to the Cassandra. + * @param sc SparkContext to be used for the job + */ + override def save(sc: SparkContext): Unit = { + duplicatesJson.saveAsTextFile(s"deduplication_training_data_${System.currentTimeMillis()}") + } + // $COVERAGE-ON$ + + /** + * Blocks the Subjects and finds duplicates between them between the Subjects and staged Subjects. + * @param sc Spark Context used to e.g. broadcast variables + */ + override def run(sc: SparkContext): Unit = { + val blocking = new Blocking + settings.get("maxBlockSize").foreach(blocking.setMaxBlockSize) + settings.get("minBlockSize").foreach(blocking.setMinBlockSize) + settings.get("filterUndefined").foreach(blocking.setFilterUndefined) + settings.get("filterSmall").foreach(blocking.setFilterSmall) + val comparingWithMasterSubjects = settings("subjectTable") == "subject" + blocking.subjects = subjects.filter(_.isSlave || !comparingWithMasterSubjects) + blocking.stagedSubjects = stagedSubjects + blocking.setPartitioning(sc) + blocking.subjectReductionFunction = (subject: Subject) => { + subject.master_history = Nil + subject.name_history = Nil + subject.category_history = Nil + subject.properties_history = Map() + subject.relations_history = Map() + subject.aliases_history = Nil + subject + } + + // TODO: set blocking schemes + val blocks = blocking.blocking().values + val unnormalizedScoreConfig = scoreConfigSettings + .map { aConf => + val scoreConf = aConf.scoreConfigs.map(_.copy[String, SimilarityMeasure[String]](weight = 1.0)) + aConf.copy(scoreConfigs = scoreConf, weight = 1.0) + } + val scoreConfigBroadcast = sc.broadcast(unnormalizedScoreConfig) + val subjectPairs = findDuplicates(blocks, settings("confidence").toDouble, scoreConfigBroadcast) + duplicatesJson = createDuplicates(subjectPairs, settings("stagingTable"), settings("subjectTable")) + } + + /** + * Compares two subjects according to the configuration and returns the scores for each similarity measure as well + * as the mean of those scores. + * @param subject1 the first Subject of the comparison + * @param subject2 the second Subject of the comparison + * @return tuple of the mean of the similarity scores and a list of the scores + */ + def compare( + subject1: Subject, + subject2: Subject, + attributeConfigs: List[AttributeConfig] = Nil, + scale: Int = 1 + ): (Double, List[Double]) = { + val scores = for { + AttributeConfig(attribute, weight, configs) <- attributeConfigs + subjectValues = subject1.get(attribute) + stagingValues = subject2.get(attribute) + if subjectValues.nonEmpty && stagingValues.nonEmpty + config <- configs + } yield CompareStrategy(attribute)(subjectValues, stagingValues, config) * weight + (scores.sum / scores.length, scores) + } + + /** + * Groups all found duplicates by the staged Subjects. These groups are then serialized into JSON. + * @param subjectPairs RDD containing all found duplicates and their scores + * @param sourceTable source table of the existing Subjects + * @param stagingTable source table of the staged Subjects + * @return RDD serialized duplicate groups + */ + def createDuplicates( + subjectPairs: RDD[(Subject, Subject, List[Double], Double)], + sourceTable: String, + stagingTable: String + ): RDD[String] = { + subjectPairs + .map { case (subject, staging, scores, mean) => + (staging.id, List((subject, staging, scores, mean))) + }.reduceByKey(_ ::: _) + .values + .map { duplicateList => + val staging = duplicateList.head._2 + val subjects = duplicateList + .sortBy { case (subject, staging, scores, mean) => -mean } + .take(10) + .map { case (subject, staging, scores, mean) => + Map( + "mean" -> mean, + "scores" -> scores, + "subject" -> subject + ) + } + val duplicateData = Map( + "staging" -> staging, + "staging_source_table" -> stagingTable, + "subject_source_table" -> sourceTable, + "subjects" -> subjects + ) + JSONParser(duplicateData).toString + } + } + + /** + * Finds the duplicates of each block by comparing the Subjects and filtering all Subjects pairs below the + * threshold confidence. The comparison scores and their mean are added to all Subject pairs that are kept. + * @param blocks RDD of Blocks that produced by the Blocking + * @param confidence threshold used to filter duplicate candidates + * @param scoreConfigBroadcast Broadcast of the score config + * @return tuple of Subjects with the scores of the similarity measures and the mean of those + */ + def findDuplicates( + blocks: RDD[Block], + confidence: Double, + scoreConfigBroadcast: Broadcast[List[AttributeConfig]] + ): RDD[(Subject, Subject, List[Double], Double)] = { + blocks + .flatMap(_.crossProduct()) + .map { case (subject1, subject2) => + val (mean, scores) = compare(subject1, subject2, scoreConfigBroadcast.value) + (subject1, subject2, scores, mean) + }.filter(_._4 >= confidence) + .repartition(64) + } +} diff --git a/src/test/scala/de/hpi/ingestion/deduplication/DeduplicationCandidateExportTest.scala b/src/test/scala/de/hpi/ingestion/deduplication/DeduplicationCandidateExportTest.scala new file mode 100644 index 00000000..f4e798dd --- /dev/null +++ b/src/test/scala/de/hpi/ingestion/deduplication/DeduplicationCandidateExportTest.scala @@ -0,0 +1,31 @@ +/* +Copyright 2016-17, Hasso-Plattner-Institut fuer Softwaresystemtechnik GmbH + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package de.hpi.ingestion.deduplication + +import com.holdenkarau.spark.testing.SharedSparkContext +import org.scalatest.{FlatSpec, Matchers} + +class DeduplicationCandidateExportTest extends FlatSpec with SharedSparkContext with Matchers { + "Candidates" should "be exported" in { + val job = new DeduplicationCandidateExport + job.subjects = sc.parallelize(TestData.subjects) + job.stagedSubjects = sc.parallelize(TestData.stagings) + job.run(sc) + val duplicates = job.duplicatesJson.collect.toSet + duplicates should have size 2 + } +}