Skip to content

Commit

Permalink
Fixes #576: Fix and test KompassDataLakeImport (#579)
Browse files Browse the repository at this point in the history
* Refs #576: adds name to kompass subjects

* Refs #576: fixes missing subject name, moves address normalization and changes normalized fields

* Refs #576: adds missing tests to datalake import and normalization

* Refs #576: removes non line breaking spaces from kompass html

* Refs #576: removes unnecessary filter

* Refs #576: changes exclude of scalastyle rules in kompass test
  • Loading branch information
janehmueller authored Oct 1, 2017
1 parent 4b19554 commit c0fa2f9
Show file tree
Hide file tree
Showing 8 changed files with 230 additions and 153 deletions.
35 changes: 22 additions & 13 deletions src/main/resources/configs/normalization_kompass.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,34 +11,23 @@
<mapping>specified_sector</mapping>
<mapping>sector</mapping>
</attribute>
<attribute>
<key>geo_district</key>
<mapping>district</mapping>
</attribute>
<attribute>
<key>gen_activities</key>
<mapping>activities</mapping>
</attribute>
<attribute>
<key>gen_employees</key>
<mapping>employees</mapping>
</attribute>
<attribute>
<key>geo_county</key>
<mapping>county</mapping>
<mapping>district</mapping>
</attribute>
<attribute>
<key>id_kompass</key>
<mapping>url</mapping>
</attribute>
<attribute>
<key>gen_fax</key>
<key>gen_phones</key>
<mapping>Fax</mapping>
</attribute>
<attribute>
<key>geo_city</key>
<mapping>city</mapping>
</attribute>
<attribute>
<key>date_founding</key>
<mapping>Gründungsjahr</mapping>
Expand All @@ -62,8 +51,28 @@
<attribute>
<key>id_tax</key>
<mapping>MwSt.</mapping>
</attribute>
<attribute>
<key>cr_ids</key>
<mapping>Handelsregister</mapping>
</attribute>
<attribute>
<key>geo_postal</key>
<mapping>address</mapping>
</attribute>
<attribute>
<key>geo_street</key>
<mapping>address</mapping>
</attribute>
<attribute>
<key>geo_city</key>
<mapping>city</mapping>
<mapping>address</mapping>
</attribute>
<attribute>
<key>geo_country</key>
<mapping>address</mapping>
</attribute>
</attributeMapping>
</normalization>
<categorization>
Expand Down
2 changes: 0 additions & 2 deletions src/main/resources/normalized_properties.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,12 @@ gen_employees
gen_sectors
gen_urls
gen_phones
gen_fax
gen_emails
gen_legal_form
gen_founder
gen_description
gen_turnover
gen_ceo
gen_activities
gen_capital
gen_management
gen_description
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ object KompassDataLakeImport extends DataLakeImportImplementation[KompassEntity]
"datalake",
"kompass_entities"
){
appName = "KompassDataLakeImport_v1.0"
configFile = "datalake_import_kompass.xml"
importConfigFile = "normalization_kompass.xml"
appName = "KompassDataLakeImport_v1.0"

// $COVERAGE-OFF$
/**
Expand All @@ -56,19 +56,6 @@ object KompassDataLakeImport extends DataLakeImportImplementation[KompassEntity]
if(attribute == "gen_sectors") normalized.flatMap(x => strategies.getOrElse(x, List(x))) else normalized
}

def extractAddress(address: String): Map[String, List[String]] = {
address match {
case r"""(.+)${street} (\d{5})${postal} (.+)${city} Deutschland""" =>
Map(
"geo_street" -> List(street.replaceFirst("str\\.", "straße")),
"geo_postal" -> List(postal),
"geo_city" -> List(city),
"geo_country" -> List("DE")
)
case _ => Map()
}
}

override def translateToSubject(
entity: KompassEntity,
version: Version,
Expand All @@ -79,28 +66,17 @@ object KompassDataLakeImport extends DataLakeImportImplementation[KompassEntity]
val subject = Subject.empty(datasource = "kompass")
val sm = new SubjectManager(subject, version)

sm.setName(entity.name)
sm.setCategory(entity.instancetype)
sm.addProperties(entity.data)

val addresses = subject
.properties
.collect { case (key, value) if key == "address" => extractAddress(value.head) }
addresses.foreach(address => sm.addProperties(address))

val legalForm = subject.name.flatMap(extractLegalForm(_, classifier)).toList
val normalizedLegalForm = SharedNormalizations.normalizeLegalForm(legalForm)
sm.addProperties(Map("gen_legal_form" -> normalizedLegalForm))

val normalizedProperties = normalizeProperties(entity, mapping, strategies)
sm.addProperties(normalizedProperties)

if (normalizedProperties.contains("geo_coords_lat") && normalizedProperties.contains("geo_coords_long")) {
val coords = normalizedProperties("geo_coords_lat")
.zip(normalizedProperties("geo_coords_long"))
.map { case (lat, long) => s"$lat;$long" }
sm.addProperties(Map("geo_coords" -> coords))
}

subject
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ object KompassNormalizationStrategy extends Serializable {
* @param value extracted legal
* @return fully normalized legal
*/
def kgMatch(value: String): List[String] = {
def kgMatch(value: String): String = {
val patternCoKg = "& Co\\. KG".r
if (patternCoKg.findFirstIn(value).isDefined) List("GmbH & Co. KG") else List("GmbH")
if(patternCoKg.findFirstIn(value).isDefined) "GmbH & Co. KG" else "GmbH"
}

/**
Expand All @@ -37,14 +37,13 @@ object KompassNormalizationStrategy extends Serializable {
* @return normalized legals
*/
def normalizeLegalForm(values: List[String]): List[String] = {
SharedNormalizations.normalizeLegalForm(values).flatMap {
case r""".*?\((.{0,4})${value}\)""" => List(value)
SharedNormalizations.normalizeLegalForm(values).map {
case r""".*?\((.{0,4})${value}\)""" => value
case r"""(?i).*?gesellschaft.*?haftungsbeschränkt(.*?)${value}""" => kgMatch(value)
case r"""(?i).*gesellschaft.*mbh(.*)${value}""" => kgMatch(value)
case r"""(?i).*gesellschaft.*mit beschränkter haftung(.*)${value}""" => kgMatch(value)
case r""".*GbR.*""" => List("GbR")
case r"""(?i).*ohg.*""" => List("OHG")
case value => List(value)
case r""".*GbR.*""" => "GbR"
case r"""(?i).*ohg.*""" => "OHG"
case value => value
}.distinct
}

Expand Down Expand Up @@ -82,12 +81,83 @@ object KompassNormalizationStrategy extends Serializable {
* @return normalized employee numbers
*/
def normalizeEmployees(values: List[String]): List[String] = {
values
.flatMap {
case r"""Von ([0-9]+)${first} bis ([0-9]+)${second} Beschäftigte""" => List(first, second)
case r"""([0-9]+)${value} Beschäftigte""" => List(value)
case r"""Mehr als ([0-9]+)${value} Beschäftigte""" => List(value)
case _ => Nil
}.map(_.replaceAll("( |\\.)", ""))
.distinct
}

/**
* Extracts the address fields of the address string.
* @param address String containing the address
* @param property field of the address to extract
* @return normalized specified address field
*/
def extractAddress(address: String, property: String): Option[String] = {
address match {
case r"""(.+)${street} (\d{5})${postal} (.+)${city} Deutschland""" =>
property match {
case "geo_street" => Option(street.replaceFirst("str\\.", "straße"))
case "geo_postal" => Option(postal)
case "geo_city" => Option(city)
case "geo_country" => Option("DE")
}
case _ => None
}
}

/**
* Normalizes the street.
* @param values list containing the address data
* @return normalized street
*/
def normalizeStreet(values: List[String]): List[String] = {
values.flatMap(extractAddress(_, "geo_street"))
}

/**
* Normalizes the postal code.
* @param values list containing the address data
* @return normalized postal code
*/
def normalizePostal(values: List[String]): List[String] = {
values.flatMap(extractAddress(_, "geo_postal"))
}

/**
* Normalizes the city.
* @param values list containing the address data
* @return normalized city
*/
def normalizeCity(values: List[String]): List[String] = {
values.flatMap {
case r"""Von ([0-9]+)${first} bis ([0-9]+)${second} Beschäftigte""" => List(first, second)
case r"""([0-9]+)${value} Beschäftigte""" => List(value)
case r"""Mehr als ([0-9]+)${value} Beschäftigte""" => List(value)
case _ => Nil
}.map(_.replaceAll("( |\\.)", "")).distinct
case address if address.endsWith("Deutschland") => extractAddress(address, "geo_city")
case city => Option(city)
}
}

/**
* Normalizes the country.
* @param values list containing the address data
* @return normalized country
*/
def normalizeCountry(values: List[String]): List[String] = {
values.flatMap(extractAddress(_, "geo_country"))
}

/**
* The default normalization does not change the values.
* @param values Strings to be normalized
* @return normalized strings
*/
def normalizeDefault(values: List[String]): List[String] = {
values
.filter(_.nonEmpty)
.distinct
}

/**
Expand All @@ -96,13 +166,17 @@ object KompassNormalizationStrategy extends Serializable {
* @return Normalization method
*/
def apply(attribute: String): (List[String]) => List[String] = {
attribute match {
case "gen_legal_form" => normalizeLegalForm
case "gen_capital" => normalizeCapital
case "gen_turnover" => normalizeTurnover
case "gen_employees" => normalizeEmployees
case _ => identity
}
(attribute match {
case "gen_legal_form" => normalizeLegalForm _
case "gen_capital" => normalizeCapital _
case "gen_turnover" => normalizeTurnover _
case "gen_employees" => normalizeEmployees _
case "geo_street" => normalizeStreet _
case "geo_postal" => normalizePostal _
case "geo_city" => normalizeCity _
case "geo_country" => normalizeCountry _
case _ => identity[List[String]] _
}) compose normalizeDefault
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ object KompassParse extends SparkJob with JSONParser[KompassEntity]{
providedAttributes.foreach(attribute => extractedData += attribute -> List((json \ attribute).as[String]))
val name = getValue(json, List("name")).map(_.as[String])

val doc = Jsoup.parse((json \ "html").as[String])
val doc = Jsoup.parse((json \ "html").as[String].replaceAll("&nbsp;", " "))
attributeSelectors.foreach { case (attribute, selector) =>
val elem = doc.select(selector).first
if (elem != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,16 @@ import com.holdenkarau.spark.testing.SharedSparkContext
import de.hpi.ingestion.implicits.CollectionImplicits._

class KompassDataLakeImportTest extends FlatSpec with Matchers with SharedSparkContext {
"extractAddress" should "extract and normalize street, postal, city and country from the address" in {
val addresses = TestData.unnormalizedAddresses
val normalizedAddresses = addresses.map(KompassDataLakeImport.extractAddress)
val expected = TestData.normalizedAddresses
normalizedAddresses shouldEqual expected
"Subject translation" should "translate all possible data" in {
val entity = TestData.kompassEntity
val version = TestData.version(sc)
val mapping = KompassDataLakeImport.normalizationSettings
val strategies = Map.empty[String, List[String]]
val classifier = KompassDataLakeImport.classifier
val subject = KompassDataLakeImport.translateToSubject(entity, version, mapping, strategies, classifier)
val expectedSubject = TestData.translatedSubject
subject.name shouldEqual entity.name
subject.category shouldEqual expectedSubject.category
subject.properties shouldEqual expectedSubject.properties
}
}
Loading

0 comments on commit c0fa2f9

Please sign in to comment.