Skip to content
This repository has been archived by the owner on Mar 3, 2023. It is now read-only.

DO NOT MERGE: Hackweek project to get Trident working in Heron #1807

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
12 changes: 12 additions & 0 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,18 @@ maven_jar(
artifact = "org.apache.reef:tang:" + reef_version
)

# added for trident prototype
maven_jar(
name = "org_clojure_clojure",
artifact = "org.clojure:clojure:1.7.0"
)

# added for trident prototype
maven_jar(
name = "org_apache_storm_core",
artifact = "org.apache.storm:storm-core:1.0.0"
)

maven_jar(
name = "org_apache_thrift_libthrift",
artifact = "org.apache.thrift:libthrift:0.5.0-1",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ public static void loggerInit(Level level, boolean isRedirectStdOutErr, String f
if (rootLogger.getLevel().intValue() < Level.WARNING.intValue()) {
// zookeeper logging scares me. if people want this, we can patch to config-drive this
Logger.getLogger("org.apache.zookeeper").setLevel(Level.WARNING);
Logger.getLogger("org.apache.storm.shade.org.apache.zookeeper").setLevel(Level.WARNING);
}

if (isRedirectStdOutErr) {
Expand Down
1 change: 1 addition & 0 deletions heron/examples/src/java/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ java_binary(
"//heron/api/src/java:api-java",
"//heron/common/src/java:basics-java",
"//heron/storm/src/java:storm-compatibility-java",
"@org_apache_storm_core//jar",
],
create_executable = 0,
)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// Copyright 2017 Twitter. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License
package com.twitter.heron.examples;

import java.util.Arrays;
import java.util.Collections;

import org.apache.storm.Config;
import org.apache.storm.generated.AlreadyAliveException;
import org.apache.storm.generated.InvalidTopologyException;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.trident.TridentState;
import org.apache.storm.trident.TridentTopology;
import org.apache.storm.trident.operation.builtin.Count;
import org.apache.storm.trident.testing.FixedBatchSpout;
import org.apache.storm.trident.testing.MemoryMapState;
import org.apache.storm.trident.testing.Split;
import org.apache.storm.trident.topology.TridentTopologyBuilder;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;

import org.apache.storm.StormSubmitter;

/**
*
*/
public class TridentWordCountTopology {

public static void main(String[] args) throws AlreadyAliveException, InvalidTopologyException {
if (args.length < 1) {
throw new RuntimeException("Specify topology name");
}

int parallelism = 1;
if (args.length > 1) {
parallelism = Integer.parseInt(args[1]);
}

@SuppressWarnings("unchecked")
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
new Values("the cow jumped over the moon"),
new Values("the man went to the store and bought some candy"),
new Values("four score and seven years ago"),
new Values("how many apples can you eat"));
spout.setCycle(true);

// This spout cycles through that set of sentences over and over to produce the sentence stream.
// Here's the code to do the streaming word count part of the computation:
TridentTopology topology = new TridentTopology();
TridentState wordCounts =
topology.newStream("spout1", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
.parallelismHint(parallelism);

Config conf = new Config();
conf.setDebug(true);

// TODO: for some reason this is automatically added to spouts but not bolts...
conf.put(Config.TOPOLOGY_KRYO_REGISTER, Collections.singletonList(
"org.apache.storm.trident.topology.TransactionAttempt"));
conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Collections.singletonList("localhost"));
conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, 2181);
conf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, "/transaction_root");
conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 5000);
conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 5000);
conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 5);
conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 5000);
conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING, 5000);
//Config.TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS
conf.put("topology.trident.batch.emit.interval.millis", 5000);

StormTopology stormTopology = topology.build();
StormSubmitter.submitTopology(args[0], conf, stormTopology);
}
}
8 changes: 7 additions & 1 deletion heron/executor/src/python/heron_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,13 @@ def _get_java_instance_cmd(self, instance_info):
'-XX:+HeapDumpOnOutOfMemoryError',
'-XX:+UseConcMarkSweepGC',
'-XX:ParallelGCThreads=4',
'-Xloggc:log-files/gc.%s.log' % instance_id]
'-Xloggc:log-files/gc.%s.log' % instance_id.replace("$", "")]
if global_task_id == -1: # Used to enable debugging of specific instances during startup
instance_cmd =\
instance_cmd + ["-agentlib:jdwp=transport=dt_socket,server=y,suspend=y,address=5005"]
instance_cmd = instance_cmd + [
"-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=500%d" % global_task_id]

instance_cmd = instance_cmd + self.instance_jvm_opts.split()
if component_name in self.component_jvm_opts:
instance_cmd = instance_cmd + self.component_jvm_opts[component_name].split()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public static void main(String[] args) throws IOException {

// Init the logging setting and redirect the stdout and stderr to logging
// For now we just set the logging level as INFO; later we may accept an argument to set it.
Level loggingLevel = Level.INFO;
Level loggingLevel = Level.FINE;
String loggingDir = systemConfig.getHeronLoggingDirectory();

// Log to file and TMaster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package com.twitter.heron.instance.bolt;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -126,8 +127,8 @@ private List<Integer> admitBoltTuple(String streamId,

sendTuple(bldr, streamId, tuple);

// TODO:- remove this after changing the api
return null;
// TODO: this used to return null. modified to make Trident work. Verify this is correct
return new ArrayList<>();
}

private void admitAckTuple(Tuple tuple) {
Expand Down
9 changes: 9 additions & 0 deletions heron/storm/src/java/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ storm_deps_files = [
"//heron/common/src/java:basics-java",
"//heron/simulator/src/java:simulator-java",
"@com_googlecode_json_simple_json_simple//jar",
"//heron/proto:proto_topology_java", # added for trident prototype
"@commons_io_commons_io//jar", # added for trident prototype
"@commons_lang_commons_lang//jar", # added for trident prototype
"@org_apache_storm_core//jar", # added for trident prototype
"@org_clojure_clojure//jar", # added for trident prototype
"@org_slf4j_slf4j_api//jar", # added for trident prototype
"@org_slf4j_slf4j_jdk14//jar", # added for trident prototype
"@org_yaml_snakeyaml//jar", # added for trident prototype
"@org_ow2_asm_asm_all//jar", # added for trident prototype, required at runtime
"//third_party/java:kryo-neverlink",
]

Expand Down
58 changes: 58 additions & 0 deletions heron/storm/src/java/org/apache/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

import org.apache.storm.serialization.IKryoDecorator;
import org.apache.storm.serialization.IKryoFactory;
import org.apache.storm.validation.ConfigValidationAnnotations;

/**
* Topology configs are specified as a plain old map. This class provides a
Expand Down Expand Up @@ -269,6 +270,63 @@ public class Config extends HashMap<String, Object> {
*/
public static final String STORM_ZOOKEEPER_PORT = "storm.zookeeper.port";

/**
* The ceiling of the interval between retries of a Zookeeper operation.
*/
@ConfigValidationAnnotations.isInteger
public static final String STORM_ZOOKEEPER_RETRY_INTERVAL_CEILING="storm.zookeeper.retry.intervalceiling.millis";

/**
* A list of hosts of Exhibitor servers used to discover/maintain connection to ZooKeeper cluster.
* Any configured ZooKeeper servers will be used for the curator/exhibitor backup connection string.
*/
@ConfigValidationAnnotations.isStringList
public static final String STORM_EXHIBITOR_SERVERS = "storm.exhibitor.servers";

/**
* The port Storm will use to connect to each of the exhibitor servers.
*/
@ConfigValidationAnnotations.isInteger
@ConfigValidationAnnotations.isPositiveNumber
public static final String STORM_EXHIBITOR_PORT = "storm.exhibitor.port";

/*
* How often to poll Exhibitor cluster in millis.
*/
@ConfigValidationAnnotations.isString
public static final String STORM_EXHIBITOR_URIPATH="storm.exhibitor.poll.uripath";

/**
* How often to poll Exhibitor cluster in millis.
*/
@ConfigValidationAnnotations.isInteger
public static final String STORM_EXHIBITOR_POLL="storm.exhibitor.poll.millis";

/**
* The number of times to retry an Exhibitor operation.
*/
@ConfigValidationAnnotations.isInteger
public static final String STORM_EXHIBITOR_RETRY_TIMES="storm.exhibitor.retry.times";

/**
* The interval between retries of an Exhibitor operation.
*/
@ConfigValidationAnnotations.isInteger
public static final String STORM_EXHIBITOR_RETRY_INTERVAL="storm.exhibitor.retry.interval";

/**
* The ceiling of the interval between retries of an Exhibitor operation.
*/
@ConfigValidationAnnotations.isInteger
public static final String STORM_EXHIBITOR_RETRY_INTERVAL_CEILING="storm.exhibitor.retry.intervalceiling.millis";

/**
* How often a batch can be emitted in a Trident topology.
*/
@ConfigValidationAnnotations.isInteger
@ConfigValidationAnnotations.isPositiveNumber
public static final String TOPOLOGY_TRIDENT_BATCH_EMIT_INTERVAL_MILLIS="topology.trident.batch.emit.interval.millis";

/**
* The root directory in ZooKeeper for metadata about TransactionalSpouts.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

package org.apache.storm.generated;

public class GlobalStreamId {
import java.io.Serializable;

public class GlobalStreamId implements Serializable {
private static final long serialVersionUID = 1873909238460677921L;
private String componentId; // required
private String streamId; // required

Expand Down Expand Up @@ -60,4 +63,28 @@ public void set_streamId(String newStreamId) {
public void unset_streamId() {
this.streamId = null;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

GlobalStreamId that = (GlobalStreamId) o;

if (componentId != null ? !componentId.equals(that.componentId) : that.componentId != null)
return false;
return streamId != null ? streamId.equals(that.streamId) : that.streamId == null;
}

@Override
public int hashCode() {
int result = componentId != null ? componentId.hashCode() : 0;
result = 31 * result + (streamId != null ? streamId.hashCode() : 0);
return result;
}

@Override
public String toString() {
return "GlobalStreamId{componentId='" + componentId + "', streamId='" + streamId + "'}";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,20 @@

package org.apache.storm.task;

import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.storm.generated.GlobalStreamId;
import org.apache.storm.generated.Grouping;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.tuple.Fields;
import org.json.simple.JSONAware;

import com.twitter.heron.api.generated.TopologyAPI;

// import org.apache.storm.generated.ComponentCommon;
// import org.apache.storm.generated.GlobalStreamId;
// import org.apache.storm.generated.Grouping;
Expand Down Expand Up @@ -104,10 +110,9 @@ public Fields getComponentOutputFields(String componentId, String streamId) {
/**
* Gets the declared output fields for the specified global stream id.
*/
/*
public Fields getComponentOutputFields(GlobalStreamId id) {
return getComponentOutputFields(id.get_componentId(), id.get_streamId());
}
*/

/**
* Gets the declared inputs to the specified component.
Expand All @@ -120,6 +125,61 @@ public Map<GlobalStreamId, Grouping> getSources(String componentId) {
}
*/

// TODO: this is total jank
public Map<GlobalStreamId, Grouping> getSources(String componentId) {
Map<TopologyAPI.StreamId, TopologyAPI.Grouping> heron = delegate.getSources(componentId);
Map<GlobalStreamId, Grouping> converted = new NoValueMap<>();
for (TopologyAPI.StreamId heronStreamId : heron.keySet()) {
converted.put(new GlobalStreamId(heronStreamId.getComponentName(), heronStreamId.getId()), null);
}
return converted;
}

// TODO: this is total jank
private class NoValueMap<K, V> extends HashMap<K, V> {
private static final long serialVersionUID = -729425631561874300L;

@Override
public Collection<V> values() {
throw new RuntimeException("Values not supported. Map only being used in context where callers want keyset");
}

@Override
public Set<Entry<K, V>> entrySet() {
throw new RuntimeException("Values not supported. Map only being used in context where callers want keyset");
}

@Override
public V get(Object key) {
throw new RuntimeException("Values not supported. Map only being used in context where callers want keyset");
}
}

/**
* Gets information about who is consuming the outputs of the specified component,
* and how.
*
* @return Map from stream id to component id to the Grouping used.
*/
// public Map<String, Map<String, TopologyAPI.Grouping>> getHeronTargets(String componentId) {
// return delegate.getTargets(componentId);
// }

public Map<String, Map<String, Grouping>> getTargets(String componentId) {
Map<String, Map<String, TopologyAPI.Grouping>> heron = delegate.getTargets(componentId);
Map<String, Map<String, Grouping>> converted = new HashMap<>();
for (String streamId : heron.keySet()) {
// TODO: this is total jank
Map<String, TopologyAPI.Grouping> heronGrouping = heron.get(streamId);
HashMap<String, Grouping> groupingConverted = new NoValueMap<>();
for (String key : heronGrouping.keySet()) {
groupingConverted.put(key, null);
}
converted.put(streamId, groupingConverted);
}
return converted;
}

/**
* Gets information about who is consuming the outputs of the specified component,
* and how.
Expand Down
Loading