Skip to content

Commit

Permalink
Release 1.0.0
Browse files Browse the repository at this point in the history
  • Loading branch information
jkouvar committed Jul 17, 2017
1 parent 6d1659a commit 596467e
Show file tree
Hide file tree
Showing 33 changed files with 3,119 additions and 0 deletions.
94 changes: 94 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>eu.daiad</groupId>
<artifactId>utility-mapreduce</artifactId>
<version>1.0.0</version>
<packaging>jar</packaging>

<name>DAIAD@utility MapReduce jobs</name>


<properties>
<java-version>1.7</java-version>
<hadoop.version>2.6.0</hadoop.version>
<hbase.version>1.0.0</hbase.version>
</properties>

<dependencies>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.3</version>
</dependency>

</dependencies>

<build>
<plugins>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>

<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer" />
</transformers>
<artifactSet>
</artifactSet>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
73 changes: 73 additions & 0 deletions src/main/java/eu/daiad/mapreduce/hbase/Application.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package eu.daiad.mapreduce.hbase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

import eu.daiad.mapreduce.hbase.job.MeterDataAggregator;
import eu.daiad.mapreduce.hbase.job.MeterForecastingDataAggregator;

public class Application {

private final static char ARGUMENT_DELIMITER = '=';

public static void main(String[] args) throws Exception {
Application application = new Application();
application.execute(args);
}

/**
* Executes a map reduce job given its name. The job name is declared using
* the argument with key {@code mapreduce.job.name}.
*
* The job arguments are expressed as key value pairs separated by = e.g.
* key1=value1 key2=value2. During runtime, any white space characters
* around = are considered part of the key and value respectively.
*
* @param args the job arguments expressed as key value pairs e.g. key1=value1 key2=value2.
* @throws Exception if argument parsing fails or job does not exist.
*/
private void execute(String[] args) throws Exception {
// Create configuration
Configuration config = HBaseConfiguration.create();

for (String arugment : args) {
String[] tokens = StringUtils.split(arugment, ARGUMENT_DELIMITER);

if (tokens.length == 2) {
config.set(tokens[0], tokens[1]);
}
}

// Add default parameters
config.set("mapreduce.map.speculative", "false");
config.set("yarn.nodemanager.aux-services", "mapreduce_shuffle");

// Create tool
Tool tool = createTool(config.get(EnumJobMapReduceParameter.JOB_NAME.getValue()));

// Execute the job
ToolRunner.run(config, tool, new String[0]);
}

/**
* Creates the appropriate {@link Tool} for running the job given its name.
*
* @param jobName the job name.
* @return a {@link Tool} implementation for running the job.
* @throws Exception if job with the given name is not implemented.
*/
private Tool createTool(String jobName) throws Exception {
switch (EnumJob.fromValue(jobName)) {
case METER_DATA_AGGREGATE:
return new MeterDataAggregator();
case METER_FORECASTING_DATA_AGGREGATE:
return new MeterForecastingDataAggregator();
default:
throw new Exception(String.format("Implementation for job [%s] was not found.", jobName));
}

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package eu.daiad.mapreduce.hbase;

/**
* Enumeration of custom MapReduce job parameters.
*/
public enum EnumAggregationJobParameter {
/**
* Not supported parameter.
*/
NOT_SUPPORTED(""),
/**
* HBase input table.
*/
INPUT_TABLE("daiad.hbase.table.input"),
/**
* HBase output table.
*/
OUTPUT_TABLE("daiad.hbase.table.output"),
/**
* Default column family for input/output tables.
*/
COLUMN_FAMILY("daiad.hbase.column-family"),
/**
* Number of partitions.
*/
PARTITIONS("daiad.hbase.data.partitions"),
/**
* Date interval format.
*/
DATE_FORMAT("daiad.interval.format"),
/**
* Date interval start instant.
*/
DATE_FROM("daiad.interval.from"),
/**
* Date interval end instant.
*/
DATE_TO("daiad.interval.to"),
/**
* Top-k / Bottom-k query limit.
*/
TOP_QUERY_LIMIT("daiad.top.query.limit"),
/**
* Cached file with groups.
*/
FILENAME_GROUPS("daiad.filename.groups"),
/**
* Cached file with users.
*/
FILENAME_USERS("daiad.filename.users");

private final String value;

public String getValue() {
return value;
}

private EnumAggregationJobParameter(String value) {
this.value = value;
}

public static EnumAggregationJobParameter fromString(String value) {
for (EnumAggregationJobParameter item : EnumAggregationJobParameter.values()) {
if (item.getValue().equalsIgnoreCase(value)) {
return item;
}
}
return NOT_SUPPORTED;
}
}
39 changes: 39 additions & 0 deletions src/main/java/eu/daiad/mapreduce/hbase/EnumHBaseParameter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package eu.daiad.mapreduce.hbase;

/**
* Enumeration of HBase parameters.
*/
public enum EnumHBaseParameter {
/**
* Not supported parameter.
*/
NOT_SUPPORTED(""),
/**
* Comma separated list of servers in the ZooKeeper ensemble.
*/
JOB_NAME("hbase.zookeeper.quorum"),
/**
* Number of rows that we try to fetch when calling next on a scanner if it
* is not served from (local, client) memory.
*/
JAR_NAME("hbase.client.scanner.caching");

private final String value;

public String getValue() {
return value;
}

private EnumHBaseParameter(String value) {
this.value = value;
}

public static EnumHBaseParameter fromString(String value) {
for (EnumHBaseParameter item : EnumHBaseParameter.values()) {
if (item.getValue().equalsIgnoreCase(value)) {
return item;
}
}
return NOT_SUPPORTED;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package eu.daiad.mapreduce.hbase;

/**
* Enumeration of MapReduce parameters.
*/
public enum EnumHadoopMapReduceParameter {
/**
* Not supported parameter.
*/
NOT_SUPPORTED(""),
/**
* The runtime framework for executing MapReduce jobs. Can be one of local,
* classic or yarn.
*/
FRAMEWORK_NAME("mapreduce.framework.name");

private final String value;

public String getValue() {
return value;
}

private EnumHadoopMapReduceParameter(String value) {
this.value = value;
}

public static EnumHadoopMapReduceParameter fromString(String value) {
for (EnumHadoopMapReduceParameter item : EnumHadoopMapReduceParameter.values()) {
if (item.getValue().equalsIgnoreCase(value)) {
return item;
}
}
return NOT_SUPPORTED;
}
}
34 changes: 34 additions & 0 deletions src/main/java/eu/daiad/mapreduce/hbase/EnumHadoopParameter.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package eu.daiad.mapreduce.hbase;

/**
* Enumeration of Hadoop parameters.
*/
public enum EnumHadoopParameter {
/**
* Not supported parameter.
*/
NOT_SUPPORTED(""),
/**
* HDFS path.
*/
HDFS_PATH("fs.defaultFS");

private final String value;

public String getValue() {
return value;
}

private EnumHadoopParameter(String value) {
this.value = value;
}

public static EnumHadoopParameter fromString(String value) {
for (EnumHadoopParameter item : EnumHadoopParameter.values()) {
if (item.getValue().equalsIgnoreCase(value)) {
return item;
}
}
return NOT_SUPPORTED;
}
}
34 changes: 34 additions & 0 deletions src/main/java/eu/daiad/mapreduce/hbase/EnumJob.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package eu.daiad.mapreduce.hbase;

/**
* Enumeration of implemented jobs.
*/
public enum EnumJob {
/**
* Smart water meter data aggregation.
*/
METER_DATA_AGGREGATE("meter-data-pre-aggregation"),
/**
* Smart water meter forecasting data aggregation.
*/
METER_FORECASTING_DATA_AGGREGATE("meter-forecasting-data-pre-aggregation");

private final String value;

public String getValue() {
return value;
}

private EnumJob(String value) {
this.value = value;
}

public static EnumJob fromValue(String value) {
for (EnumJob item : EnumJob.values()) {
if (item.getValue().equalsIgnoreCase(value)) {
return item;
}
}
return null;
}
}
Loading

0 comments on commit 596467e

Please sign in to comment.