-
Notifications
You must be signed in to change notification settings - Fork 21
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sparkdeduplication #429
base: master
Are you sure you want to change the base?
Sparkdeduplication #429
Conversation
a new version of the clustering mechanism with auto-sizing clusters.
to obtain better scalability.
Added programatical logging into stdout, for easier log reading
Added reshuffle for better work balance.
Needs code cleanup and qality assurance.
Version used for performance testing.
a new version of the clustering mechanism with auto-sizing clusters.
to obtain better scalability.
Added programatical logging into stdout, for easier log reading
Added reshuffle for better work balance.
Needs code cleanup and qality assurance.
Version used for performance testing.
Task tiling class rewritten to scala, with tests.
Cleaning up project files. Fixed workflow building for oozie.
…into sparkdeduplication
import pl.edu.icm.coansys.deduplication.document.comparator.VotesProductComparator | ||
import pl.edu.icm.coansys.deduplication.document.comparator.WorkComparator | ||
import scala.collection.mutable.ListBuffer | ||
import pl.edu.icm.coansys.document.deduplication._ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unused import
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code is overall good, though there is one major issue: CartesianTaskSplit.processPairs
function is returning an empty list. It doesn't look like it should work.
There are also some minor style & performance observations (reduceByKey
and foldByKey
used to achieve results of a groupByKey
operation) noted in comments.
Apart from the places indicated in comments it would be great to go through IDE suggestions & do code reformatting for the whole code.
Thanks for the code.
* | ||
*/ | ||
object DeduplicateDocuments { | ||
val log = org.slf4j.LoggerFactory.getLogger(getClass().getName()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Empty parentheses should be removed in method calls that do not have side effects (here and in other places in this code)
* | ||
*/ | ||
object DeduplicateDocuments { | ||
val log = org.slf4j.LoggerFactory.getLogger(getClass().getName()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
'log' variable can be private
object DeduplicateDocuments { | ||
val log = org.slf4j.LoggerFactory.getLogger(getClass().getName()) | ||
|
||
implicit def toJavaBiPredicate[A, B](predicate: (A, B) => Boolean) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import scala.language.implicitConversions to turn off compiler warnings about implicits
} else { | ||
false | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isValidDocument
could be rewritten in a functional programming style using the Option
monad:
def isValidDocument(doc: DocumentWrapper): Boolean =
Option(doc.getDocumentMetadata)
.flatMap(md => Option(md.getBasicMetadata))
.exists(bmd => bmd.getTitleCount > 0 || bmd.getAuthorCount > 0 || bmd.hasDoi || bmd.hasJournal)
"The Option companion object's apply method serves as a conversion function from nullable references" https://stackoverflow.com/questions/4692506/wrapping-null-returning-method-in-java-with-option-in-scala
It would be even better if we used scala protobuf compiler. That would directly support Option
for optional values in protobufs (see https://scalapb.github.io/generated-code.html).
flatMap
is acting on monads like bind
in Haskell, if that clarifies something. Simple introduction to the concept of using Option as a monad in Scala and how it makes code clearer: https://www.slideshare.net/jankrag/introduction-to-option-monad-in-scala
|
||
|
||
|
||
def main(args: Array[String]): Unit = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be more readable if the main
method was placed before the helper methods.
} | ||
} | ||
}).mapValues(_._1) | ||
inputDocs.join(selectedClusters).map(p => (p._2._2, p._2._1)).groupByKey |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Last expression could be simplified to:
selectedClusters.join(inputDocs).values.groupByKey
*/ | ||
def mergeDocuments(docs: List[DocumentWrapper]): DocumentWrapper = { | ||
val merger = buildDocumentsMerger() | ||
val merged = merger.merge(docs); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to define val merged
val normalized = StringTools.normalize(title); | ||
//seems that normalize removes stopwords, which is wrong, and quite expensive | ||
//val normalized = StringTools.removeStopWords(StringTools.normalize(title)); | ||
val res = normalized.replaceAll("\\s+", "") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no need to define val res
here, the expression assigned to it could just be simply the last expression in this function.
def generateKeys(title: String): Seq[String] = { | ||
val ctitle = cleanUpString(title) | ||
val mlen = keySizes.max | ||
val longestKey = ctitle.zipWithIndex.filter(_._2 % 2 == 0).map(_._1).take(mlen).mkString |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line too long.
val ctitle = cleanUpString(title) | ||
val mlen = keySizes.max | ||
val longestKey = ctitle.zipWithIndex.filter(_._2 % 2 == 0).map(_._1).take(mlen).mkString | ||
keySizes.map(keyLength => longestKey.substring(0, Math.min(keyLength, longestKey.size))).distinct |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Line too long.
Ready, working version of scala/spark deduplication, to replace the original mapreduce/pig solution.