From 400caa4c074464710df33290b60ac61beedd5792 Mon Sep 17 00:00:00 2001 From: Josh Fischer Date: Tue, 9 Jan 2018 11:09:24 -0600 Subject: [PATCH] Joshfischer/eco config (#2658) * initial addition of directory structure * clean up * adding base config for build file \ fixing linting issues * saving progress. Still need to fix missing class definition on ParseException * fixing shaded dependency issue * starting experimentation on how to approach loading config from yaml file * experimenting with snake yaml to extract topology defintion * adding start of topology and component definitions * refactor * successfully populating ecoTopologyDefinition from yaml * clean up * clean up * fixed versioning error * saving progress, still working through building topology from yaml * saving progress * cleaning up styles * checking default config values * adding test for customized config map * transitioning to support heron api * removing last of streamlet api from eco * making a pivot to match storm flux behavior better * saving before chaos * saving progress, running into issue with verifying the toplogy * adding some logging for debugging * printing topology definition. Adding tests * saving with failing test, still working through investigation * back to working * saving progress * finishing eco parser test * handling exception * package change * package change * adding missed id * MVP of ECO at this commit * fixing styles, pulling topology name from topology definition file * fixing spelling * clean up * clean up * refatoring eco builder package * start to removing static calls * removing static calls * still working through linting errors * fixing lint errors * fixing setter * adding test * start to verfiying bean references * verifying components * fixing tests * worked passed constructor argument issue. Now need to deal with backtypes from Storm * saving before restructure * building kafka topology * fixing imports for test * fixing folder structure * refactor for testability * removing more statics * cleaning up eco builders * confirmation config builder maps as expected * finishing configBuilder test * cleaning up BoltBuilder * verfiying component builder * replacing java logger * adding tests * adding verification of building implementations of IRichBolts * clean up * adding mocks for testing * fixing classNotFound errors * coverage on all of stream bulder * need to refactor ObjectBuilder * moving static call * adding verifcation to test * clean up * clean up * verifying path of building objects with args * covering other path * starting on builder utility * verifying className * adding test * adding test * clean up * adding test * clean up * clean up * fixing checkstyle errors * fixing linting issues * fixing linting issues * fixing linting issues * fixing linting issues * linting issues * fixing checkstyle * fixing checkstyle * saving progress * adding simple wordcount eco example * adding examples * checkstyles --- examples/src/java/BUILD | 21 + .../heron/examples/eco/LogInfoBolt.java | 41 ++ .../heron/examples/eco/TestNameCounter.java | 61 +++ .../heron/examples/eco/TestNameSpout.java | 81 ++++ .../heron/examples/eco/TestPrintBolt.java | 33 ++ .../heron/examples/eco/TestWindowBolt.java | 46 ++ examples/src/resources/simple_windowing.yaml | 67 +++ examples/src/resources/simple_wordcount.yaml | 60 +++ heron/eco/src/java/BUILD | 34 ++ .../src/java/com/twitter/heron/eco/Eco.java | 158 +++++++ .../heron/eco/builder/BoltBuilder.java | 35 ++ .../heron/eco/builder/BuilderUtility.java | 116 +++++ .../heron/eco/builder/ComponentBuilder.java | 36 ++ .../heron/eco/builder/ConfigBuilder.java | 32 ++ .../twitter/heron/eco/builder/EcoBuilder.java | 66 +++ .../heron/eco/builder/ObjectBuilder.java | 311 +++++++++++++ .../heron/eco/builder/SpoutBuilder.java | 41 ++ .../heron/eco/builder/StreamBuilder.java | 129 ++++++ .../heron/eco/definition/BeanDefinition.java | 27 ++ .../eco/definition/BeanListReference.java | 30 ++ .../heron/eco/definition/BeanReference.java | 30 ++ .../heron/eco/definition/BoltDefinition.java | 17 + .../heron/eco/definition/ComponentStream.java | 67 +++ .../ConfigurationMethodDefinition.java | 64 +++ .../eco/definition/EcoExecutionContext.java | 112 +++++ .../eco/definition/EcoTopologyDefinition.java | 108 +++++ .../eco/definition/GroupingDefinition.java | 65 +++ .../eco/definition/ObjectDefinition.java | 106 +++++ .../eco/definition/PropertyDefinition.java | 55 +++ .../heron/eco/definition/SpoutDefinition.java | 17 + .../eco/definition/StreamDefinition.java | 63 +++ .../twitter/heron/eco/parser/EcoParser.java | 53 +++ .../heron/eco/submit/EcoSubmitter.java | 28 ++ heron/eco/tests/java/BUILD | 90 ++++ .../java/com/twitter/heron/eco/EcoTest.java | 91 ++++ .../heron/eco/builder/BoltBuilderTest.java | 89 ++++ .../heron/eco/builder/BuilderUtilityTest.java | 139 ++++++ .../eco/builder/ComponentBuilderTest.java | 89 ++++ .../heron/eco/builder/ConfigBuilderTest.java | 63 +++ .../heron/eco/builder/EcoBuilderTest.java | 127 ++++++ .../heron/eco/builder/ObjectBuilderTest.java | 159 +++++++ .../heron/eco/builder/SpoutBuilderTest.java | 159 +++++++ .../heron/eco/builder/StreamBuilderTest.java | 370 ++++++++++++++++ .../heron/eco/parser/EcoParserTest.java | 407 ++++++++++++++++++ .../heron/eco/submit/EcoSubmitterTest.java | 55 +++ .../metricsmgr/executor/SinkExecutorTest.java | 2 +- .../metricsmgr/metrics/MetricsRecordTest.java | 2 +- scripts/get_all_heron_paths.sh | 2 +- 48 files changed, 4051 insertions(+), 3 deletions(-) create mode 100644 examples/src/java/com/twitter/heron/examples/eco/LogInfoBolt.java create mode 100644 examples/src/java/com/twitter/heron/examples/eco/TestNameCounter.java create mode 100644 examples/src/java/com/twitter/heron/examples/eco/TestNameSpout.java create mode 100644 examples/src/java/com/twitter/heron/examples/eco/TestPrintBolt.java create mode 100644 examples/src/java/com/twitter/heron/examples/eco/TestWindowBolt.java create mode 100644 examples/src/resources/simple_windowing.yaml create mode 100644 examples/src/resources/simple_wordcount.yaml create mode 100644 heron/eco/src/java/BUILD create mode 100644 heron/eco/src/java/com/twitter/heron/eco/Eco.java create mode 100644 heron/eco/src/java/com/twitter/heron/eco/builder/BoltBuilder.java create mode 100644 heron/eco/src/java/com/twitter/heron/eco/builder/BuilderUtility.java create mode 100644 heron/eco/src/java/com/twitter/heron/eco/builder/ComponentBuilder.java create mode 100644 heron/eco/src/java/com/twitter/heron/eco/builder/ConfigBuilder.java create mode 100644 heron/eco/src/java/com/twitter/heron/eco/builder/EcoBuilder.java create mode 100644 heron/eco/src/java/com/twitter/heron/eco/builder/ObjectBuilder.java create mode 100644 heron/eco/src/java/com/twitter/heron/eco/builder/SpoutBuilder.java create mode 100644 heron/eco/src/java/com/twitter/heron/eco/builder/StreamBuilder.java create mode 100644 heron/eco/src/java/com/twitter/heron/eco/definition/BeanDefinition.java create mode 100644 heron/eco/src/java/com/twitter/heron/eco/definition/BeanListReference.java create mode 100644 heron/eco/src/java/com/twitter/heron/eco/definition/BeanReference.java create mode 100644 heron/eco/src/java/com/twitter/heron/eco/definition/BoltDefinition.java create mode 100644 heron/eco/src/java/com/twitter/heron/eco/definition/ComponentStream.java create mode 100644 heron/eco/src/java/com/twitter/heron/eco/definition/ConfigurationMethodDefinition.java create mode 100644 heron/eco/src/java/com/twitter/heron/eco/definition/EcoExecutionContext.java create mode 100644 heron/eco/src/java/com/twitter/heron/eco/definition/EcoTopologyDefinition.java create mode 100644 heron/eco/src/java/com/twitter/heron/eco/definition/GroupingDefinition.java create mode 100644 heron/eco/src/java/com/twitter/heron/eco/definition/ObjectDefinition.java create mode 100644 heron/eco/src/java/com/twitter/heron/eco/definition/PropertyDefinition.java create mode 100644 heron/eco/src/java/com/twitter/heron/eco/definition/SpoutDefinition.java create mode 100644 heron/eco/src/java/com/twitter/heron/eco/definition/StreamDefinition.java create mode 100644 heron/eco/src/java/com/twitter/heron/eco/parser/EcoParser.java create mode 100644 heron/eco/src/java/com/twitter/heron/eco/submit/EcoSubmitter.java create mode 100644 heron/eco/tests/java/BUILD create mode 100644 heron/eco/tests/java/com/twitter/heron/eco/EcoTest.java create mode 100644 heron/eco/tests/java/com/twitter/heron/eco/builder/BoltBuilderTest.java create mode 100644 heron/eco/tests/java/com/twitter/heron/eco/builder/BuilderUtilityTest.java create mode 100644 heron/eco/tests/java/com/twitter/heron/eco/builder/ComponentBuilderTest.java create mode 100644 heron/eco/tests/java/com/twitter/heron/eco/builder/ConfigBuilderTest.java create mode 100644 heron/eco/tests/java/com/twitter/heron/eco/builder/EcoBuilderTest.java create mode 100644 heron/eco/tests/java/com/twitter/heron/eco/builder/ObjectBuilderTest.java create mode 100644 heron/eco/tests/java/com/twitter/heron/eco/builder/SpoutBuilderTest.java create mode 100644 heron/eco/tests/java/com/twitter/heron/eco/builder/StreamBuilderTest.java create mode 100644 heron/eco/tests/java/com/twitter/heron/eco/parser/EcoParserTest.java create mode 100644 heron/eco/tests/java/com/twitter/heron/eco/submit/EcoSubmitterTest.java diff --git a/examples/src/java/BUILD b/examples/src/java/BUILD index 39946036b5f..4a4772f6195 100644 --- a/examples/src/java/BUILD +++ b/examples/src/java/BUILD @@ -37,3 +37,24 @@ genrule( outs = ["heron-streamlet-examples.jar"], cmd = "cp $< $@", ) + + +java_binary( + name='eco-examples-unshaded', + srcs = glob(["com/twitter/heron/examples/eco/**/*.java"]), + deps = [ + "//heron/api/src/java:api-java-low-level", + "//heron/api/src/java:api-java", + "//heron/common/src/java:basics-java", + "//heron/eco/src/java:eco-core", + "//storm-compatibility/src/java:storm-compatibility-java", + ], + create_executable = 0, +) + +genrule( + name = 'heron-eco-examples', + srcs = [":eco-examples-unshaded_deploy.jar"], + outs = ["heron-eco-examples.jar"], + cmd = "cp $< $@", +) \ No newline at end of file diff --git a/examples/src/java/com/twitter/heron/examples/eco/LogInfoBolt.java b/examples/src/java/com/twitter/heron/examples/eco/LogInfoBolt.java new file mode 100644 index 00000000000..32711786384 --- /dev/null +++ b/examples/src/java/com/twitter/heron/examples/eco/LogInfoBolt.java @@ -0,0 +1,41 @@ +// Copyright 2018 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.examples.eco; + + +import java.util.logging.Logger; + +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Tuple; + +/** + * Simple bolt that does nothing other than LOG.info() every tuple received. + * + */ +@SuppressWarnings("serial") +public class LogInfoBolt extends BaseBasicBolt { + private static final Logger LOG = Logger.getLogger(LogInfoBolt.class.getName()); + + @Override + public void execute(Tuple tuple, BasicOutputCollector basicOutputCollector) { + LOG.info("{ }" + tuple); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) { + + } +} diff --git a/examples/src/java/com/twitter/heron/examples/eco/TestNameCounter.java b/examples/src/java/com/twitter/heron/examples/eco/TestNameCounter.java new file mode 100644 index 00000000000..00b5f64e268 --- /dev/null +++ b/examples/src/java/com/twitter/heron/examples/eco/TestNameCounter.java @@ -0,0 +1,61 @@ +// Copyright 2018 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.examples.eco; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; + +import static com.twitter.heron.api.utils.Utils.tuple; +@SuppressWarnings({"serial", "rawtypes"}) +public class TestNameCounter extends BaseBasicBolt { + + private Map counts; + + @Override + public void prepare(Map map, TopologyContext topologyContext) { + counts = new HashMap<>(); + } + + + protected String getTupleValue(Tuple t, int idx) { + return (String) t.getValues().get(idx); + } + + public void execute(Tuple input, BasicOutputCollector collector) { + String word = getTupleValue(input, 0); + int count = 0; + if (counts.containsKey(word)) { + count = counts.get(word); + } + count++; + counts.put(word, count); + collector.emit(tuple(word, count)); + } + + public void cleanup() { + + } + + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("name", "count")); + } + +} diff --git a/examples/src/java/com/twitter/heron/examples/eco/TestNameSpout.java b/examples/src/java/com/twitter/heron/examples/eco/TestNameSpout.java new file mode 100644 index 00000000000..4e7e8a58864 --- /dev/null +++ b/examples/src/java/com/twitter/heron/examples/eco/TestNameSpout.java @@ -0,0 +1,81 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.examples.eco; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +import org.apache.storm.Config; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; + +@SuppressWarnings({"serial", "HiddenField"}) +public class TestNameSpout extends BaseRichSpout { + private boolean isdistributed; + private SpoutOutputCollector collector; + + public TestNameSpout() { + this(true); + } + + public TestNameSpout(boolean isDistributed) { + isdistributed = isDistributed; + } + + public void open(Map conf, TopologyContext context, + SpoutOutputCollector collector) { + this.collector = collector; + } + + public void close() { + + } + + public void nextTuple() { + Utils.sleep(100); + final String[] words = new String[] {"marge", "homer", "bart", "simpson", "lisa"}; + final Random rand = new Random(); + final String word = words[rand.nextInt(words.length)]; + collector.emit(new Values(word)); + } + + public void ack(Object msgId) { + + } + + public void fail(Object msgId) { + + } + + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("name")); + } + + @Override + public Map getComponentConfiguration() { + if (!isdistributed) { + Map ret = new HashMap(); + ret.put(Config.TOPOLOGY_WORKERS, 1); + return ret; + } else { + return null; + } + } +} diff --git a/examples/src/java/com/twitter/heron/examples/eco/TestPrintBolt.java b/examples/src/java/com/twitter/heron/examples/eco/TestPrintBolt.java new file mode 100644 index 00000000000..f9cb1d693af --- /dev/null +++ b/examples/src/java/com/twitter/heron/examples/eco/TestPrintBolt.java @@ -0,0 +1,33 @@ +// Copyright 2018 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.examples.eco; + +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Tuple; + +@SuppressWarnings("serial") +public class TestPrintBolt extends BaseBasicBolt { + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + System.out.println(tuple); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer ofd) { + } + +} diff --git a/examples/src/java/com/twitter/heron/examples/eco/TestWindowBolt.java b/examples/src/java/com/twitter/heron/examples/eco/TestWindowBolt.java new file mode 100644 index 00000000000..ef6046cd520 --- /dev/null +++ b/examples/src/java/com/twitter/heron/examples/eco/TestWindowBolt.java @@ -0,0 +1,46 @@ +// Copyright 2018 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.examples.eco; + +import java.util.Map; + +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseWindowedBolt; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; +import org.apache.storm.windowing.TupleWindow; + +@SuppressWarnings({"serial", "HiddenField"}) +public class TestWindowBolt extends BaseWindowedBolt { + private OutputCollector collector; + + + @Override + public void prepare(Map topoConf, TopologyContext context, + OutputCollector collector) { + this.collector = collector; + } + + @Override + public void execute(TupleWindow inputWindow) { + collector.emit(new Values(inputWindow.get().size())); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("count")); + } +} diff --git a/examples/src/resources/simple_windowing.yaml b/examples/src/resources/simple_windowing.yaml new file mode 100644 index 00000000000..e860b6458eb --- /dev/null +++ b/examples/src/resources/simple_windowing.yaml @@ -0,0 +1,67 @@ +# Copyright 2017 Twitter. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http:#www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- + +name: "sliding-window-topology" + +components: + - id: "windowLength" + className: "org.apache.storm.topology.base.BaseWindowedBolt$Count" + constructorArgs: + - 5 + - id: "slidingInterval" + className: "org.apache.storm.topology.base.BaseWindowedBolt$Count" + constructorArgs: + - 3 + +config: + topology.workers: 1 + +# spout definitions +spouts: + - id: "spout-1" + className: "com.twitter.heron.examples.eco.TestNameSpout" + parallelism: 1 + +# bolt definitions +bolts: + - id: "bolt-1" + className: "com.twitter.heron.examples.eco.TestWindowBolt" + configMethods: + - name: "withWindow" + args: [ref: "windowLength", ref: "slidingInterval"] + parallelism: 1 + - id: "bolt-2" + className: "com.twitter.heron.examples.eco.TestPrintBolt" + parallelism: 1 + + +#stream definitions +# stream definitions define connections between spouts and bolts. +# note that such connections can be cyclical +streams: + - name: "spout-1 --> bolt-1" # name isn't used (placeholder for logging, UI, etc.) +# id: "connection-1" + from: "spout-1" + to: "bolt-1" + grouping: + type: FIELDS + args: ["word"] + - name: "bolt-1 --> bolt-2" # name isn't used (placeholder for logging, UI, etc.) +# id: "connection-1" + from: "bolt-1" + to: "bolt-2" + grouping: + type: SHUFFLE \ No newline at end of file diff --git a/examples/src/resources/simple_wordcount.yaml b/examples/src/resources/simple_wordcount.yaml new file mode 100644 index 00000000000..d28eec244ee --- /dev/null +++ b/examples/src/resources/simple_wordcount.yaml @@ -0,0 +1,60 @@ +# Copyright 2017 Twitter. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http:#www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +--- + +# topology definition +# name to be used when submitting +name: "simple-wordcount-topology" + +# topology configuration +# this will be passed to the submitter as a map of config options +# +config: + topology.workers: 1 + +# spout definitions +spouts: + - id: "spout-1" + className: "com.twitter.heron.examples.eco.TestNameSpout" + parallelism: 1 + +# bolt definitions +bolts: + - id: "bolt-1" + className: "com.twitter.heron.examples.eco.TestNameCounter" + parallelism: 1 + + - id: "bolt-2" + className: "com.twitter.heron.examples.eco.LogInfoBolt" + parallelism: 1 + +#stream definitions +# stream definitions define connections between spouts and bolts. +# note that such connections can be cyclical +streams: + - name: "spout-1 --> bolt-1" # name isn't used (placeholder for logging, UI, etc.) + id: "connection-1" + from: "spout-1" + to: "bolt-1" + grouping: + type: FIELDS + args: ["word"] + + - name: "bolt-1 --> bolt2" + id: "connection-2" + from: "bolt-1" + to: "bolt-2" + grouping: + type: SHUFFLE \ No newline at end of file diff --git a/heron/eco/src/java/BUILD b/heron/eco/src/java/BUILD new file mode 100644 index 00000000000..ec26955f78d --- /dev/null +++ b/heron/eco/src/java/BUILD @@ -0,0 +1,34 @@ +licenses(["notice"]) + +package(default_visibility = ["//visibility:public"]) + +eco_deps = [ +"@commons_cli_commons_cli//jar", +"@org_yaml_snakeyaml//jar", +"//third_party/java:logging", +"//storm-compatibility/src/java:storm-compatibility-java", +"//heron/api/src/java:api-java-low-level", +"//heron/common/src/java:basics-java", +] + + +java_binary( + name="eco-core-shaded", + srcs = glob(["com/twitter/heron/eco/**/*.java"]), + deps = eco_deps, + create_executable = 0, + +) + +java_library( + name = "eco-java", + srcs = glob(["com/twitter/heron/eco/**/*.java"]), + deps = eco_deps +) + +genrule( + name = "eco-core", + srcs = [":eco-core-shaded_deploy.jar"], + outs = ["eco-core.jar"], + cmd = "cp $< $@", +) diff --git a/heron/eco/src/java/com/twitter/heron/eco/Eco.java b/heron/eco/src/java/com/twitter/heron/eco/Eco.java new file mode 100644 index 00000000000..cc30489665f --- /dev/null +++ b/heron/eco/src/java/com/twitter/heron/eco/Eco.java @@ -0,0 +1,158 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco; + +import java.io.File; +import java.io.FileInputStream; +import java.util.logging.Logger; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.CommandLineParser; +import org.apache.commons.cli.DefaultParser; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.commons.cli.ParseException; +import org.apache.storm.Config; +import org.apache.storm.topology.TopologyBuilder; + +import com.twitter.heron.eco.builder.BoltBuilder; +import com.twitter.heron.eco.builder.BuilderUtility; +import com.twitter.heron.eco.builder.ComponentBuilder; +import com.twitter.heron.eco.builder.ConfigBuilder; +import com.twitter.heron.eco.builder.EcoBuilder; +import com.twitter.heron.eco.builder.ObjectBuilder; +import com.twitter.heron.eco.builder.SpoutBuilder; +import com.twitter.heron.eco.builder.StreamBuilder; +import com.twitter.heron.eco.definition.BoltDefinition; +import com.twitter.heron.eco.definition.EcoExecutionContext; +import com.twitter.heron.eco.definition.EcoTopologyDefinition; +import com.twitter.heron.eco.definition.SpoutDefinition; +import com.twitter.heron.eco.definition.StreamDefinition; +import com.twitter.heron.eco.parser.EcoParser; +import com.twitter.heron.eco.submit.EcoSubmitter; + + +public class Eco { + + private static final Logger LOG = Logger.getLogger(Eco.class.getName()); + + private EcoBuilder ecoBuilder; + private EcoParser ecoParser; + private EcoSubmitter ecoSubmitter; + + public Eco(EcoBuilder ecoBuilder, EcoParser ecoParser, EcoSubmitter ecoSubmitter) { + this.ecoBuilder = ecoBuilder; + this.ecoParser = ecoParser; + this.ecoSubmitter = ecoSubmitter; + } + + public void submit(FileInputStream fileInputStream) throws Exception { + EcoTopologyDefinition topologyDefinition = ecoParser.parseFromInputStream(fileInputStream); + + String topologyName = topologyDefinition.getName(); + + Config topologyConfig = ecoBuilder + .buildConfig(topologyDefinition); + + EcoExecutionContext executionContext + = new EcoExecutionContext(topologyDefinition, topologyConfig); + + printTopologyInfo(executionContext); + + ObjectBuilder objectBuilder = new ObjectBuilder(); + objectBuilder.setBuilderUtility(new BuilderUtility()); + TopologyBuilder builder = ecoBuilder + .buildTopologyBuilder(executionContext, objectBuilder); + + ecoSubmitter.submitTopology(topologyName, topologyConfig, builder.createTopology()); + } + + public static void main(String[] args) throws Exception { + Options options = constructOptions(); + + CommandLineParser parser = new DefaultParser(); + + CommandLine cmd; + try { + cmd = parser.parse(options, args); + } catch (ParseException e) { + throw new RuntimeException("Error parsing command line options: ", e); + } + + FileInputStream fin = new FileInputStream(new File(cmd.getOptionValue("eco-config-file"))); + + Eco eco = new Eco( + new EcoBuilder( + new SpoutBuilder(), + new BoltBuilder(), + new StreamBuilder(), + new ComponentBuilder(), + new ConfigBuilder()), + new EcoParser(), + new EcoSubmitter()); + + eco.submit(fin); + } + + private static Options constructOptions() { + Options options = new Options(); + Option ecoConfig = Option.builder("eco") + .desc("Yaml config file for specifying topology definitions") + .longOpt("eco-config-file") + .hasArgs() + .argName("eco-config-file") + .required() + .build(); + options.addOption(ecoConfig); + return options; + } + + // construct command line help options + //TODO: (joshfischer) integrate with existing system somehow + private static Options constructHelpOptions() { + Options options = new Options(); + Option help = Option.builder("h") + .desc("List all options and their description") + .longOpt("help") + .build(); + + options.addOption(help); + return options; + } + + static void printTopologyInfo(EcoExecutionContext ctx) { + EcoTopologyDefinition t = ctx.getTopologyDefinition(); + + LOG.info("---------- TOPOLOGY DETAILS ----------"); + + LOG.info(String.format("Topology Name: %s", t.getName())); + LOG.info("--------------- SPOUTS ---------------"); + for (SpoutDefinition s : t.getSpouts()) { + LOG.info(String.format("%s [%d] (%s)", s.getId(), s.getParallelism(), s.getClassName())); + } + LOG.info("---------------- BOLTS ---------------"); + for (BoltDefinition b : t.getBolts()) { + LOG.info(String.format("%s [%d] (%s)", b.getId(), b.getParallelism(), b.getClassName())); + } + + LOG.info("--------------- STREAMS ---------------"); + for (StreamDefinition sd : t.getStreams()) { + LOG.info(String.format("%s --%s--> %s", + sd.getFrom(), + sd.getGrouping().getType(), + sd.getTo())); + } + LOG.info("--------------------------------------"); + } +} diff --git a/heron/eco/src/java/com/twitter/heron/eco/builder/BoltBuilder.java b/heron/eco/src/java/com/twitter/heron/eco/builder/BoltBuilder.java new file mode 100644 index 00000000000..6af6ac9b90f --- /dev/null +++ b/heron/eco/src/java/com/twitter/heron/eco/builder/BoltBuilder.java @@ -0,0 +1,35 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.builder; + +import java.lang.reflect.InvocationTargetException; + +import com.twitter.heron.eco.definition.EcoExecutionContext; +import com.twitter.heron.eco.definition.EcoTopologyDefinition; +import com.twitter.heron.eco.definition.ObjectDefinition; + +public class BoltBuilder { + + protected void buildBolts(EcoExecutionContext executionContext, + ObjectBuilder objectBuilder) + throws IllegalAccessException, InstantiationException, ClassNotFoundException, + NoSuchFieldException, InvocationTargetException { + EcoTopologyDefinition topologyDefinition = executionContext.getTopologyDefinition(); + + for (ObjectDefinition def: topologyDefinition.getBolts()) { + Object obj = objectBuilder.buildObject(def, executionContext); + executionContext.addBolt(def.getId(), obj); + } + } +} diff --git a/heron/eco/src/java/com/twitter/heron/eco/builder/BuilderUtility.java b/heron/eco/src/java/com/twitter/heron/eco/builder/BuilderUtility.java new file mode 100644 index 00000000000..a0ed5265cd0 --- /dev/null +++ b/heron/eco/src/java/com/twitter/heron/eco/builder/BuilderUtility.java @@ -0,0 +1,116 @@ +// Copyright 2018 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.builder; + +import java.lang.reflect.Field; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.twitter.heron.eco.definition.BeanListReference; +import com.twitter.heron.eco.definition.BeanReference; +import com.twitter.heron.eco.definition.EcoExecutionContext; +import com.twitter.heron.eco.definition.ObjectDefinition; +import com.twitter.heron.eco.definition.PropertyDefinition; + +public class BuilderUtility { + + private static final Logger LOG = LoggerFactory.getLogger(BuilderUtility.class); + + @SuppressWarnings("rawtypes") + protected List resolveReferences(List args, EcoExecutionContext context) { + LOG.debug("Checking arguments for references."); + List cArgs = new ArrayList<>(); + + // resolve references + for (Object arg : args) { + if (arg instanceof BeanReference) { + LOG.debug("BeanReference: " + ((BeanReference) arg).getId()); + cArgs.add(context.getComponent(((BeanReference) arg).getId())); + } else if (arg instanceof BeanListReference) { + List components = new ArrayList<>(); + BeanListReference ref = (BeanListReference) arg; + for (String id : ref.getIds()) { + components.add(context.getComponent(id)); + } + + LOG.debug("BeanListReference resolved as {}" + components); + cArgs.add(components); + } else { + LOG.debug("Unknown:" + arg.toString()); + cArgs.add(arg); + } + } + return cArgs; + } + + @SuppressWarnings("rawtypes") + protected void applyProperties(ObjectDefinition bean, Object instance, + EcoExecutionContext context) throws + IllegalAccessException, InvocationTargetException, NoSuchFieldException { + List props = bean.getProperties(); + Class clazz = instance.getClass(); + if (props != null) { + for (PropertyDefinition prop : props) { + Object value = prop.isReference() ? context.getComponent(prop.getRef()) : prop.getValue(); + Method setter = findSetter(clazz, prop.getName()); + if (setter != null) { + LOG.info("found setter, attempting with: " + instance.getClass() + " " + value); + // invoke setter + setter.invoke(instance, new Object[]{value}); + } else { + // look for a public instance variable + LOG.debug("no setter found. Looking for a public instance variable..."); + Field field = findPublicField(clazz, prop.getName()); + if (field != null) { + field.set(instance, value); + } + } + } + } + } + + @SuppressWarnings("rawtypes") + protected Field findPublicField(Class clazz, String property) + throws NoSuchFieldException { + Field field = clazz.getField(property); + return field; + } + + @SuppressWarnings("rawtypes") + private Method findSetter(Class clazz, String property) { + String setterName = toSetterName(property); + Method retval = null; + Method[] methods = clazz.getMethods(); + for (Method method : methods) { + if (setterName.equals(method.getName())) { + LOG.debug("Found setter method: " + method.getName()); + retval = method; + } + } + return retval; + } + + protected String toSetterName(String name) { + return "set" + name.substring(0, 1).toUpperCase() + name.substring(1, name.length()); + } + + protected Class classForName(String className) throws ClassNotFoundException { + return Class.forName(className); + } +} diff --git a/heron/eco/src/java/com/twitter/heron/eco/builder/ComponentBuilder.java b/heron/eco/src/java/com/twitter/heron/eco/builder/ComponentBuilder.java new file mode 100644 index 00000000000..9fe602303ad --- /dev/null +++ b/heron/eco/src/java/com/twitter/heron/eco/builder/ComponentBuilder.java @@ -0,0 +1,36 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.builder; + +import java.lang.reflect.InvocationTargetException; +import java.util.List; + +import com.twitter.heron.eco.definition.BeanDefinition; +import com.twitter.heron.eco.definition.EcoExecutionContext; + +public class ComponentBuilder { + protected void buildComponents(EcoExecutionContext context, ObjectBuilder objectBuilder) + throws ClassNotFoundException, + IllegalAccessException, InstantiationException, + NoSuchFieldException, InvocationTargetException { + List componentDefinitions = context.getTopologyDefinition().getComponents(); + + if (componentDefinitions != null) { + for (BeanDefinition bean : componentDefinitions) { + Object obj = objectBuilder.buildObject(bean, context); + context.addComponent(bean.getId(), obj); + } + } + } +} diff --git a/heron/eco/src/java/com/twitter/heron/eco/builder/ConfigBuilder.java b/heron/eco/src/java/com/twitter/heron/eco/builder/ConfigBuilder.java new file mode 100644 index 00000000000..a6b758b553a --- /dev/null +++ b/heron/eco/src/java/com/twitter/heron/eco/builder/ConfigBuilder.java @@ -0,0 +1,32 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.builder; + +import java.util.Map; + +import org.apache.storm.Config; + +import com.twitter.heron.eco.definition.EcoTopologyDefinition; + +public class ConfigBuilder { + protected Config buildConfig(EcoTopologyDefinition topologyDefinition) { + + Map configMap = topologyDefinition.getConfig(); + Config config = new Config(); + for (Map.Entry entry: configMap.entrySet()) { + config.put(entry.getKey(), entry.getValue()); + } + return config; + } +} diff --git a/heron/eco/src/java/com/twitter/heron/eco/builder/EcoBuilder.java b/heron/eco/src/java/com/twitter/heron/eco/builder/EcoBuilder.java new file mode 100644 index 00000000000..c448f72e8a1 --- /dev/null +++ b/heron/eco/src/java/com/twitter/heron/eco/builder/EcoBuilder.java @@ -0,0 +1,66 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.builder; + + +import java.lang.reflect.InvocationTargetException; + +import org.apache.storm.Config; +import org.apache.storm.topology.TopologyBuilder; + +import com.twitter.heron.eco.definition.EcoExecutionContext; +import com.twitter.heron.eco.definition.EcoTopologyDefinition; + + +public class EcoBuilder { + + private SpoutBuilder spoutBuilder; + + private BoltBuilder boltBuilder; + + private StreamBuilder streamBuilder; + + private ComponentBuilder componentBuilder; + + private ConfigBuilder configBuilder; + + public EcoBuilder(SpoutBuilder spoutBuilder, BoltBuilder boltBuilder, + StreamBuilder streamBuilder, ComponentBuilder componentBuilder, + ConfigBuilder configBuilder) { + this.spoutBuilder = spoutBuilder; + this.boltBuilder = boltBuilder; + this.streamBuilder = streamBuilder; + this.componentBuilder = componentBuilder; + this.configBuilder = configBuilder; + } + + public TopologyBuilder buildTopologyBuilder(EcoExecutionContext executionContext, + ObjectBuilder objectBuilder) + throws InstantiationException, IllegalAccessException, + ClassNotFoundException, + NoSuchFieldException, InvocationTargetException { + + TopologyBuilder builder = new TopologyBuilder(); + componentBuilder.buildComponents(executionContext, objectBuilder); + spoutBuilder.buildSpouts(executionContext, builder, objectBuilder); + boltBuilder.buildBolts(executionContext, objectBuilder); + streamBuilder.buildStreams(executionContext, builder, objectBuilder); + + return builder; + } + + public Config buildConfig(EcoTopologyDefinition topologyDefinition) { + return this.configBuilder.buildConfig(topologyDefinition); + } +} diff --git a/heron/eco/src/java/com/twitter/heron/eco/builder/ObjectBuilder.java b/heron/eco/src/java/com/twitter/heron/eco/builder/ObjectBuilder.java new file mode 100644 index 00000000000..5f2f441d468 --- /dev/null +++ b/heron/eco/src/java/com/twitter/heron/eco/builder/ObjectBuilder.java @@ -0,0 +1,311 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.builder; + +import java.lang.reflect.Array; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.List; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.twitter.heron.eco.definition.ConfigurationMethodDefinition; +import com.twitter.heron.eco.definition.EcoExecutionContext; +import com.twitter.heron.eco.definition.ObjectDefinition; + +public class ObjectBuilder { + private static final Logger LOG = LoggerFactory.getLogger(ObjectBuilder.class); + + private BuilderUtility builderUtility; + + public void setBuilderUtility(BuilderUtility builderUtility) { + this.builderUtility = builderUtility; + } + + @SuppressWarnings("rawtypes") + public Object buildObject(ObjectDefinition def, EcoExecutionContext context) + throws ClassNotFoundException, IllegalAccessException, InstantiationException, + InvocationTargetException, NoSuchFieldException { + Class clazz = builderUtility.classForName(def.getClassName()); + + Object obj; + if (def.hasConstructorArgs()) { + LOG.debug("Found constructor arguments in definition "); + List cArgs = def.getConstructorArgs(); + + if (def.hasReferences()) { + LOG.debug("The definition has references"); + cArgs = builderUtility.resolveReferences(cArgs, context); + } else { + LOG.debug("The definition does not have references"); + } + LOG.debug("finding compatible constructor for : " + clazz.getName()); + Constructor con = findCompatibleConstructor(cArgs, clazz); + if (con != null) { + LOG.debug("Found something seemingly compatible, attempting invocation..."); + obj = con.newInstance(getArgsWithListCoercian(cArgs, con.getParameterTypes())); + } else { + String msg = String + .format("Couldn't find a suitable constructor for class '%s' with arguments '%s'.", + clazz.getName(), + cArgs); + throw new IllegalArgumentException(msg); + } + } else { + obj = clazz.newInstance(); + } + builderUtility.applyProperties(def, obj, context); + invokeConfigMethods(def, obj, context); + return obj; + } + + @SuppressWarnings("rawtypes") + protected Constructor findCompatibleConstructor(List args, Class target) { + Constructor retval = null; + int eligibleCount = 0; + + LOG.debug("Target class: " + target.getName() + ", constructor args: " + args); + Constructor[] cons = target.getDeclaredConstructors(); + + for (Constructor con : cons) { + Class[] paramClasses = con.getParameterTypes(); + + if (paramClasses.length == args.size()) { + LOG.debug("found constructor with same number of args.."); + boolean invokable = canInvokeWithArgs(args, con.getParameterTypes()); + if (invokable) { + retval = con; + eligibleCount++; + } + LOG.debug("** invokable --> {}" + invokable); + } else { + LOG.debug("Skipping constructor with wrong number of arguments."); + } + } + if (eligibleCount > 1) { + LOG.error("Found multiple invokable constructors for class: " + + target + ", given arguments " + args + ". Using the last one found."); + } + return retval; + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + protected boolean canInvokeWithArgs(List args, Class[] parameterTypes) { + if (parameterTypes.length != args.size()) { + LOG.warn("parameter types were the wrong size"); + return false; + } + + for (int i = 0; i < args.size(); i++) { + Object obj = args.get(i); + if (obj == null) { + throw new IllegalArgumentException("argument shouldn't be null - index: " + i); + } + Class paramType = parameterTypes[i]; + Class objectType = obj.getClass(); + LOG.debug("Comparing parameter class " + paramType + " to object class " + + objectType + "to see if assignment is possible."); + if (paramType.equals(objectType)) { + LOG.debug("Yes, they are the same class."); + } else if (paramType.isAssignableFrom(objectType)) { + LOG.debug("Yes, assignment is possible."); + } else if (isPrimitiveBoolean(paramType) && Boolean.class.isAssignableFrom(objectType)) { + LOG.debug("Yes, assignment is possible."); + } else if (isPrimitiveNumber(paramType) && Number.class.isAssignableFrom(objectType)) { + LOG.debug("Yes, assignment is possible."); + } else if (paramType.isEnum() && objectType.equals(String.class)) { + LOG.debug("Yes, will convert a String to enum"); + } else if (paramType.isArray() && List.class.isAssignableFrom(objectType)) { + LOG.debug("Assignment is possible if we convert a List to an array."); + LOG.debug("Array Type: " + paramType.getComponentType() + ", List type: " + + ((List) obj).get(0).getClass()); + } else { + LOG.debug("returning false"); + return false; + } + } + return true; + } + + @SuppressWarnings("rawtypes") + protected boolean isPrimitiveNumber(Class clazz) { + return clazz.isPrimitive() && !clazz.equals(boolean.class); + } + + @SuppressWarnings("rawtypes") + protected boolean isPrimitiveBoolean(Class clazz) { + return clazz.isPrimitive() && clazz.equals(boolean.class); + } + + @SuppressWarnings("rawtypes") + public void invokeConfigMethods(ObjectDefinition bean, + Object instance, EcoExecutionContext context) + throws InvocationTargetException, IllegalAccessException { + + List methodDefs = bean.getConfigMethods(); + if (methodDefs == null || methodDefs.size() == 0) { + return; + } + Class clazz = instance.getClass(); + for (ConfigurationMethodDefinition methodDef : methodDefs) { + List args = methodDef.getArgs(); + if (args == null) { + args = new ArrayList(); + } + if (methodDef.hasReferences()) { + args = builderUtility.resolveReferences(args, context); + } + String methodName = methodDef.getName(); + LOG.debug("method name: " + methodName); + Method method = findCompatibleMethod(args, clazz, methodName); + if (method != null) { + Object[] methodArgs = getArgsWithListCoercian(args, method.getParameterTypes()); + method.invoke(instance, methodArgs); + } else { + String msg = String + .format("Unable to find configuration method '%s' in class '%s' with arguments %s.", + new Object[]{methodName, clazz.getName(), args}); + throw new IllegalArgumentException(msg); + } + } + } + + @SuppressWarnings("rawtypes") + private Method findCompatibleMethod(List args, Class target, String methodName) { + Method retval = null; + int eligibleCount = 0; + LOG.debug("Target class: " + target.getName() + ", methodName: " + + methodName + ", args: " + args); + Method[] methods = target.getMethods(); + LOG.debug("methods count: " + methods.length); + for (Method method : methods) { + Class[] paramClasses = method.getParameterTypes(); + if (paramClasses.length == args.size() && method.getName().equals(methodName)) { + LOG.debug("found constructor with same number of args.."); + boolean invokable = false; + if (args.size() == 0) { + // it's a method with zero args + invokable = true; + } else { + invokable = canInvokeWithArgs(args, method.getParameterTypes()); + } + if (invokable) { + retval = method; + eligibleCount++; + } + LOG.debug("** invokable --> " + invokable); + } else { + LOG.debug("Skipping method with wrong number of arguments."); + } + } + if (eligibleCount > 1) { + LOG.warn("Found multiple invokable methods for class, method, given arguments {} " + + new Object[]{target, methodName, args}); + } + return retval; + } + + + + /** + * Given a java.util.List of contructor/method arguments, and a list of parameter types, + * attempt to convert the + * list to an java.lang.Object array that can be used to invoke the constructor. + * If an argument needs + * to be coerced from a List to an Array, do so. + */ + @SuppressWarnings({"unchecked", "rawtypes"}) + private Object[] getArgsWithListCoercian(List args, Class[] parameterTypes) { +// Class[] parameterTypes = constructor.getParameterTypes(); + if (parameterTypes.length != args.size()) { + throw new IllegalArgumentException("Contructor parameter count does not " + + "egual argument size."); + } + Object[] constructorParams = new Object[args.size()]; + + // loop through the arguments, if we hit a list that has to be convered to an array, + // perform the conversion + for (int i = 0; i < args.size(); i++) { + Object obj = args.get(i); + Class paramType = parameterTypes[i]; + Class objectType = obj.getClass(); + LOG.debug("Comparing parameter class " + paramType.getName() + " to object class " + + objectType.getName() + " to see if assignment is possible."); + if (paramType.equals(objectType)) { + LOG.debug("They are the same class."); + constructorParams[i] = args.get(i); + continue; + } + if (paramType.isAssignableFrom(objectType)) { + LOG.debug("Assignment is possible."); + constructorParams[i] = args.get(i); + continue; + } + if (isPrimitiveBoolean(paramType) && Boolean.class.isAssignableFrom(objectType)) { + LOG.debug("Its a primitive boolean."); + Boolean bool = (Boolean) args.get(i); + constructorParams[i] = bool.booleanValue(); + continue; + } + if (isPrimitiveNumber(paramType) && Number.class.isAssignableFrom(objectType)) { + LOG.debug("Its a primitive number."); + Number num = (Number) args.get(i); + if (paramType == Float.TYPE) { + constructorParams[i] = num.floatValue(); + } else if (paramType == Double.TYPE) { + constructorParams[i] = num.doubleValue(); + } else if (paramType == Long.TYPE) { + constructorParams[i] = num.longValue(); + } else if (paramType == Integer.TYPE) { + constructorParams[i] = num.intValue(); + } else if (paramType == Short.TYPE) { + constructorParams[i] = num.shortValue(); + } else if (paramType == Byte.TYPE) { + constructorParams[i] = num.byteValue(); + } else { + constructorParams[i] = args.get(i); + } + continue; + } + + // enum conversion + if (paramType.isEnum() && objectType.equals(String.class)) { + LOG.debug("Yes, will convert a String to enum"); + constructorParams[i] = Enum.valueOf(paramType, (String) args.get(i)); + continue; + } + + // List to array conversion + if (paramType.isArray() && List.class.isAssignableFrom(objectType)) { + LOG.debug("Conversion appears possible..."); + List list = (List) obj; + LOG.debug("Array Type: {}, List type: {}" + paramType.getComponentType() + + list.get(0).getClass()); + + // create an array of the right type + Object newArrayObj = Array.newInstance(paramType.getComponentType(), list.size()); + for (int j = 0; j < list.size(); j++) { + Array.set(newArrayObj, j, list.get(j)); + + } + constructorParams[i] = newArrayObj; + LOG.debug("After conversion: {}" + constructorParams[i]); + } + } + return constructorParams; + } +} diff --git a/heron/eco/src/java/com/twitter/heron/eco/builder/SpoutBuilder.java b/heron/eco/src/java/com/twitter/heron/eco/builder/SpoutBuilder.java new file mode 100644 index 00000000000..19dbfc95a13 --- /dev/null +++ b/heron/eco/src/java/com/twitter/heron/eco/builder/SpoutBuilder.java @@ -0,0 +1,41 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.builder; + +import java.lang.reflect.InvocationTargetException; + +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.TopologyBuilder; + +import com.twitter.heron.eco.definition.EcoExecutionContext; +import com.twitter.heron.eco.definition.EcoTopologyDefinition; +import com.twitter.heron.eco.definition.ObjectDefinition; + + +public class SpoutBuilder { + + protected void buildSpouts(EcoExecutionContext executionContext, + TopologyBuilder builder, + ObjectBuilder objectBuilder) + throws ClassNotFoundException, InstantiationException, IllegalAccessException, + NoSuchFieldException, InvocationTargetException { + EcoTopologyDefinition topologyDefinition = executionContext.getTopologyDefinition(); + + for (ObjectDefinition def: topologyDefinition.getSpouts()) { + Object obj = objectBuilder.buildObject(def, executionContext); + builder.setSpout(def.getId(), (IRichSpout) obj, def.getParallelism()); + executionContext.addSpout(def.getId(), obj); + } + } +} diff --git a/heron/eco/src/java/com/twitter/heron/eco/builder/StreamBuilder.java b/heron/eco/src/java/com/twitter/heron/eco/builder/StreamBuilder.java new file mode 100644 index 00000000000..e31e3f0db3e --- /dev/null +++ b/heron/eco/src/java/com/twitter/heron/eco/builder/StreamBuilder.java @@ -0,0 +1,129 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.builder; + +import java.lang.reflect.InvocationTargetException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.storm.grouping.CustomStreamGrouping; +import org.apache.storm.topology.BoltDeclarer; +import org.apache.storm.topology.IBasicBolt; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.IWindowedBolt; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.storm.utils.Utils; + +import com.twitter.heron.eco.definition.ComponentStream; +import com.twitter.heron.eco.definition.EcoExecutionContext; +import com.twitter.heron.eco.definition.EcoTopologyDefinition; +import com.twitter.heron.eco.definition.GroupingDefinition; +import com.twitter.heron.eco.definition.ObjectDefinition; +import com.twitter.heron.eco.definition.StreamDefinition; + +public class StreamBuilder { + + protected void buildStreams(EcoExecutionContext executionContext, TopologyBuilder builder, + ObjectBuilder objectBuilder) + throws IllegalAccessException, InstantiationException, ClassNotFoundException, + NoSuchFieldException, InvocationTargetException { + EcoTopologyDefinition topologyDefinition = executionContext.getTopologyDefinition(); + Map componentStreams = new HashMap<>(); + + HashMap declarers = new HashMap<>(); + for (StreamDefinition stream : topologyDefinition.getStreams()) { + Object boltObj = executionContext.getBolt(stream.getTo()); + BoltDeclarer declarer = declarers.get(stream.getTo()); + if (boltObj instanceof IRichBolt) { + if (declarer == null) { + declarer = builder.setBolt(stream.getTo(), + (IRichBolt) boltObj, + topologyDefinition.parallelismForBolt(stream.getTo())); + declarers.put(stream.getTo(), declarer); + } + } else if (boltObj instanceof IBasicBolt) { + if (declarer == null) { + declarer = builder.setBolt( + stream.getTo(), + (IBasicBolt) boltObj, + topologyDefinition.parallelismForBolt(stream.getTo())); + declarers.put(stream.getTo(), declarer); + } + } else if (boltObj instanceof IWindowedBolt) { + if (declarer == null) { + declarer = builder.setBolt( + stream.getTo(), + (IWindowedBolt) boltObj, + topologyDefinition.parallelismForBolt(stream.getTo())); + declarers.put(stream.getTo(), declarer); + } + } else { + throw new IllegalArgumentException("Class does not appear to be a bolt: " + + boltObj.getClass().getName()); + } + + GroupingDefinition grouping = stream.getGrouping(); + // if the streamId is defined, use it for the grouping, + // otherwise assume default stream + // Todo(joshfischer) Not sure if "default" is still valid + String streamId = grouping.getStreamId() == null + ? Utils.DEFAULT_STREAM_ID : grouping.getStreamId(); + + + switch (grouping.getType()) { + case SHUFFLE: + declarer.shuffleGrouping(stream.getFrom(), streamId); + break; + case FIELDS: + //TODO check for null grouping args + List groupingArgs = grouping.getArgs(); + if (groupingArgs == null) { + throw new IllegalArgumentException("You must supply arguments for Fields grouping"); + } + declarer.fieldsGrouping(stream.getFrom(), streamId, new Fields(groupingArgs)); + break; + case ALL: + declarer.allGrouping(stream.getFrom(), streamId); + break; + case GLOBAL: + declarer.globalGrouping(stream.getFrom(), streamId); + break; + case NONE: + declarer.noneGrouping(stream.getFrom(), streamId); + break; + case CUSTOM: + declarer.customGrouping(stream.getFrom(), streamId, + buildCustomStreamGrouping(stream.getGrouping().getCustomClass(), + executionContext, + objectBuilder)); + break; + default: + throw new UnsupportedOperationException("unsupported grouping type: " + grouping); + } + } + executionContext.setStreams(componentStreams); + } + + private CustomStreamGrouping buildCustomStreamGrouping(ObjectDefinition objectDefinition, + EcoExecutionContext executionContext, + ObjectBuilder objectBuilder) + throws ClassNotFoundException, + IllegalAccessException, InstantiationException, NoSuchFieldException, + InvocationTargetException { + Object grouping = objectBuilder.buildObject(objectDefinition, executionContext); + return (CustomStreamGrouping) grouping; + } +} diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/BeanDefinition.java b/heron/eco/src/java/com/twitter/heron/eco/definition/BeanDefinition.java new file mode 100644 index 00000000000..806ae14e637 --- /dev/null +++ b/heron/eco/src/java/com/twitter/heron/eco/definition/BeanDefinition.java @@ -0,0 +1,27 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.definition; + +public class BeanDefinition extends ObjectDefinition { + + private String id; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } +} diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/BeanListReference.java b/heron/eco/src/java/com/twitter/heron/eco/definition/BeanListReference.java new file mode 100644 index 00000000000..ff70fde8433 --- /dev/null +++ b/heron/eco/src/java/com/twitter/heron/eco/definition/BeanListReference.java @@ -0,0 +1,30 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.definition; + +import java.util.List; + +public class BeanListReference { + public List ids; + + public BeanListReference() { } + + public BeanListReference(List ids) { + this.ids = ids; + } + + public List getIds() { + return ids; + } +} diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/BeanReference.java b/heron/eco/src/java/com/twitter/heron/eco/definition/BeanReference.java new file mode 100644 index 00000000000..2314c82a671 --- /dev/null +++ b/heron/eco/src/java/com/twitter/heron/eco/definition/BeanReference.java @@ -0,0 +1,30 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.definition; + +public class BeanReference { + private String id; + + public BeanReference(String id) { + this.id = id; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } +} diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/BoltDefinition.java b/heron/eco/src/java/com/twitter/heron/eco/definition/BoltDefinition.java new file mode 100644 index 00000000000..203623dca02 --- /dev/null +++ b/heron/eco/src/java/com/twitter/heron/eco/definition/BoltDefinition.java @@ -0,0 +1,17 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.definition; + +public class BoltDefinition extends ObjectDefinition { +} diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/ComponentStream.java b/heron/eco/src/java/com/twitter/heron/eco/definition/ComponentStream.java new file mode 100644 index 00000000000..5b747fde08f --- /dev/null +++ b/heron/eco/src/java/com/twitter/heron/eco/definition/ComponentStream.java @@ -0,0 +1,67 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.definition; + +public class ComponentStream { + + private String id; + + private String toComponent; + + private String fromComponent; + + private String streamName; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public Object getToComponent() { + return toComponent; + } + + public void setToComponent(String toComponent) { + this.toComponent = toComponent; + } + + public String getFromComponent() { + return fromComponent; + } + + public void setFromComponent(String fromComponent) { + this.fromComponent = fromComponent; + } + + public String getStreamName() { + return streamName; + } + + public void setStreamName(String streamName) { + this.streamName = streamName; + } + + @Override + public String toString() { + return "ComponentStream{" + + "toComponent='" + toComponent + '\'' + + ", fromComponent='" + fromComponent + + '\'' + + ", streamName='" + streamName + '\'' + + '}'; + } +} diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/ConfigurationMethodDefinition.java b/heron/eco/src/java/com/twitter/heron/eco/definition/ConfigurationMethodDefinition.java new file mode 100644 index 00000000000..d63918b7136 --- /dev/null +++ b/heron/eco/src/java/com/twitter/heron/eco/definition/ConfigurationMethodDefinition.java @@ -0,0 +1,64 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.definition; + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public class ConfigurationMethodDefinition { + private String name; + private List args; + private boolean hasReferences = false; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public List getArgs() { + return args; + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + public void setArgs(List args) { + + List newVal = new ArrayList(); + for (Object obj : args) { + if (obj instanceof LinkedHashMap) { + Map map = (Map) obj; + if (map.containsKey("ref") && map.size() == 1) { + newVal.add(new BeanReference((String) map.get("ref"))); + this.hasReferences = true; + } else if (map.containsKey("reflist") && map.size() == 1) { + newVal.add(new BeanListReference((List) map.get("reflist"))); + this.hasReferences = true; + } else { + newVal.add(obj); + } + } else { + newVal.add(obj); + } + } + this.args = newVal; + } + + public boolean hasReferences() { + return this.hasReferences; + } +} diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/EcoExecutionContext.java b/heron/eco/src/java/com/twitter/heron/eco/definition/EcoExecutionContext.java new file mode 100644 index 00000000000..6a1c8daca9b --- /dev/null +++ b/heron/eco/src/java/com/twitter/heron/eco/definition/EcoExecutionContext.java @@ -0,0 +1,112 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.definition; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.storm.Config; + + +public class EcoExecutionContext { + + private EcoTopologyDefinition topologyDefinition; + + private Config config; + + private Map spouts = new HashMap<>(); + + private Map bolts = new HashMap<>(); + + private Map streams = new HashMap<>(); + + private Map components = new HashMap<>(); + + public EcoExecutionContext(EcoTopologyDefinition topologyDefinition, Config config) { + this.topologyDefinition = topologyDefinition; + this.config = config; + } + + public EcoTopologyDefinition getTopologyDefinition() { + return topologyDefinition; + } + + public void setTopologyDefinition(EcoTopologyDefinition topologyDefinition) { + this.topologyDefinition = topologyDefinition; + } + + public Config getConfig() { + return config; + } + + public void setConfig(Config config) { + this.config = config; + } + + public Map getSpouts() { + return spouts; + } + + public void setSpouts(Map spouts) { + this.spouts = spouts; + } + + public Map getBolts() { + return bolts; + } + + public Object getBolt(String id) { + return this.bolts.get(id); + } + + public void setBolts(Map bolts) { + this.bolts = bolts; + } + + public void addBolt(String key, Object value) { + this.bolts.put(key, value); + } + + public Object getChild(String id) { + return this.bolts.get(id); + } + + public Map getStreams() { + return streams; + } + + public void setStreams(Map streams) { + this.streams = streams; + } + + public Map getComponents() { + return components; + } + + public void addComponent(String key, Object value) { + this.components.put(key, value); + } + + public Object getComponent(String id) { + return this.components.get(id); + } + + public void setComponents(Map components) { + this.components = components; + } + + public void addSpout(String key, Object value) { + this.spouts.put(key, value); + } +} diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/EcoTopologyDefinition.java b/heron/eco/src/java/com/twitter/heron/eco/definition/EcoTopologyDefinition.java new file mode 100644 index 00000000000..b5b8ad9f25c --- /dev/null +++ b/heron/eco/src/java/com/twitter/heron/eco/definition/EcoTopologyDefinition.java @@ -0,0 +1,108 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.definition; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public class EcoTopologyDefinition { + + private String name; + private Map config = new HashMap<>(); + private Map spouts = new LinkedHashMap<>(); + private Map bolts = new LinkedHashMap<>(); + private List streams = new ArrayList<>(); + private Map components = new LinkedHashMap<>(); + + public List getSpouts() { + return new ArrayList<>(this.spouts.values()); + } + + public SpoutDefinition getSpout(String id) { + return this.spouts.get(id); + } + + public void setSpouts(List sources) { + this.spouts = new LinkedHashMap<>(); + for (SpoutDefinition source: sources) { + this.spouts.put(source.getId(), source); + } + } + + public List getBolts() { + return new ArrayList<>(this.bolts.values()); + } + + public BoltDefinition getBolt(String id) { + return this.bolts.get(id); + } + + public void setBolts(List children) { + this.bolts = new LinkedHashMap<>(); + for (BoltDefinition child: children) { + this.bolts.put(child.getId(), child); + } + } + + public List getComponents() { + return new ArrayList<>(this.components.values()); + } + + public Object getComponent(String id) { + return this.components.get(id); + } + + public void setComponents(List components) { + for (BeanDefinition bean: components) { + this.components.put(bean.getId(), bean); + } + } + + public void addComponent(String key, BeanDefinition value) { + this.components.put(key, value); + } + + public List getStreams() { + return streams; + } + + public void setStreams(List streams) { + this.streams = streams; + } + + + public Map getConfig() { + return config; + } + + public void setConfig(Map config) { + this.config = config; + } + + public String getName() { + + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Number parallelismForBolt(String to) { + return this.bolts.get(to).getParallelism(); + } +} diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/GroupingDefinition.java b/heron/eco/src/java/com/twitter/heron/eco/definition/GroupingDefinition.java new file mode 100644 index 00000000000..1eeb8170bad --- /dev/null +++ b/heron/eco/src/java/com/twitter/heron/eco/definition/GroupingDefinition.java @@ -0,0 +1,65 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.definition; + +import java.util.List; + +public class GroupingDefinition { + + public enum Type { + ALL, + CUSTOM, + SHUFFLE, + FIELDS, + GLOBAL, + NONE + } + + private Type type; + private String streamId; + private List args; + private ObjectDefinition customClass; + + public Type getType() { + return type; + } + + public void setType(Type type) { + this.type = type; + } + + public String getStreamId() { + return streamId; + } + + public void setStreamId(String streamId) { + this.streamId = streamId; + } + + public List getArgs() { + return args; + } + + public void setArgs(List args) { + this.args = args; + } + + public ObjectDefinition getCustomClass() { + return customClass; + } + + public void setCustomClass(ObjectDefinition customClass) { + this.customClass = customClass; + } +} diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/ObjectDefinition.java b/heron/eco/src/java/com/twitter/heron/eco/definition/ObjectDefinition.java new file mode 100644 index 00000000000..3a675b7c072 --- /dev/null +++ b/heron/eco/src/java/com/twitter/heron/eco/definition/ObjectDefinition.java @@ -0,0 +1,106 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.definition; + + +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +public abstract class ObjectDefinition { + + private String id; + private String className; + private int parallelism = 1; + private List constructorArgs; + private List properties; + private List configMethods; + private boolean hasReferences; + + public List getProperties() { + return properties; + } + + public boolean hasConstructorArgs() { + return this.constructorArgs != null && this.constructorArgs.size() > 0; + } + + public void setProperties(List properties) { + this.properties = properties; + } + + public boolean hasReferences() { + return this.hasReferences; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getClassName() { + return className; + } + + public void setClassName(String className) { + this.className = className; + } + + public int getParallelism() { + return parallelism; + } + + public void setParallelism(int parallelism) { + this.parallelism = parallelism; + } + + public List getConstructorArgs() { + return constructorArgs; + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + public void setConstructorArgs(List constructorArgs) { + + List newVal = new ArrayList(); + for (Object obj : constructorArgs) { + if (obj instanceof LinkedHashMap) { + Map map = (Map) obj; + if (map.containsKey("ref") && map.size() == 1) { + newVal.add(new BeanReference((String) map.get("ref"))); + this.hasReferences = true; + } else if (map.containsKey("reflist") && map.size() == 1) { + newVal.add(new BeanListReference((List) map.get("reflist"))); + this.hasReferences = true; + } else { + newVal.add(obj); + } + } else { + newVal.add(obj); + } + } + this.constructorArgs = newVal; + } + + public List getConfigMethods() { + return configMethods; + } + + public void setConfigMethods(List configMethods) { + this.configMethods = configMethods; + } +} diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/PropertyDefinition.java b/heron/eco/src/java/com/twitter/heron/eco/definition/PropertyDefinition.java new file mode 100644 index 00000000000..e1cc902f4b9 --- /dev/null +++ b/heron/eco/src/java/com/twitter/heron/eco/definition/PropertyDefinition.java @@ -0,0 +1,55 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.definition; + +public class PropertyDefinition { + + private String name; + private Object value; + private String ref; + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public Object getValue() { + return value; + } + + public void setValue(Object value) { + if (this.ref != null) { + throw new IllegalStateException("A property can only have a value OR a reference, not both."); + } + this.value = value; + } + + public String getRef() { + return ref; + } + + public void setRef(String ref) { + if (this.value != null) { + throw new IllegalStateException("A property can only have a value OR a reference, not both."); + } + this.ref = ref; + } + + public boolean isReference() { + return this.ref != null; + } +} diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/SpoutDefinition.java b/heron/eco/src/java/com/twitter/heron/eco/definition/SpoutDefinition.java new file mode 100644 index 00000000000..cb4f0a13164 --- /dev/null +++ b/heron/eco/src/java/com/twitter/heron/eco/definition/SpoutDefinition.java @@ -0,0 +1,17 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.definition; + +public class SpoutDefinition extends ObjectDefinition { +} diff --git a/heron/eco/src/java/com/twitter/heron/eco/definition/StreamDefinition.java b/heron/eco/src/java/com/twitter/heron/eco/definition/StreamDefinition.java new file mode 100644 index 00000000000..03081c37d36 --- /dev/null +++ b/heron/eco/src/java/com/twitter/heron/eco/definition/StreamDefinition.java @@ -0,0 +1,63 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.definition; + +public class StreamDefinition { + + private String id; + private String name; + private String to; + private String from; + private GroupingDefinition grouping; + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + + public String getTo() { + return to; + } + + public void setTo(String to) { + this.to = to; + } + + public String getFrom() { + return from; + } + + public void setFrom(String from) { + this.from = from; + } + + public GroupingDefinition getGrouping() { + return grouping; + } + + public void setGrouping(GroupingDefinition groupingDefinition) { + this.grouping = groupingDefinition; + } +} diff --git a/heron/eco/src/java/com/twitter/heron/eco/parser/EcoParser.java b/heron/eco/src/java/com/twitter/heron/eco/parser/EcoParser.java new file mode 100644 index 00000000000..5aacfb3c130 --- /dev/null +++ b/heron/eco/src/java/com/twitter/heron/eco/parser/EcoParser.java @@ -0,0 +1,53 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.parser; + +import java.io.InputStream; + +import org.yaml.snakeyaml.TypeDescription; +import org.yaml.snakeyaml.Yaml; +import org.yaml.snakeyaml.constructor.Constructor; + +import com.twitter.heron.eco.definition.BoltDefinition; +import com.twitter.heron.eco.definition.EcoTopologyDefinition; +import com.twitter.heron.eco.definition.SpoutDefinition; + +public class EcoParser { + + public EcoTopologyDefinition parseFromInputStream(InputStream inputStream) + throws Exception { + + Yaml yaml = topologyYaml(); + + if (inputStream == null) { + throw new Exception("Unable to load eco input stream"); + } + return loadTopologyFromYaml(yaml, inputStream); + } + + private EcoTopologyDefinition loadTopologyFromYaml(Yaml yaml, InputStream inputStream) { + return (EcoTopologyDefinition) yaml.load(inputStream); + } + private static Yaml topologyYaml() { + Constructor topologyConstructor = new Constructor(EcoTopologyDefinition.class); + + TypeDescription topologyDescription = new TypeDescription(EcoTopologyDefinition.class); + + topologyDescription.putListPropertyType("spouts", SpoutDefinition.class); + topologyDescription.putListPropertyType("bolts", BoltDefinition.class); + topologyConstructor.addTypeDescription(topologyDescription); + + return new Yaml(topologyConstructor); + } +} diff --git a/heron/eco/src/java/com/twitter/heron/eco/submit/EcoSubmitter.java b/heron/eco/src/java/com/twitter/heron/eco/submit/EcoSubmitter.java new file mode 100644 index 00000000000..032a73ee7d7 --- /dev/null +++ b/heron/eco/src/java/com/twitter/heron/eco/submit/EcoSubmitter.java @@ -0,0 +1,28 @@ +// Copyright 2018 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.submit; + +import org.apache.storm.Config; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.AlreadyAliveException; +import org.apache.storm.generated.InvalidTopologyException; +import org.apache.storm.generated.StormTopology; + +public class EcoSubmitter { + + public void submitTopology(String topologyName, Config topologyConfig, StormTopology topology) + throws AlreadyAliveException, InvalidTopologyException { + StormSubmitter.submitTopology(topologyName, topologyConfig, topology); + } +} diff --git a/heron/eco/tests/java/BUILD b/heron/eco/tests/java/BUILD new file mode 100644 index 00000000000..ef66b769243 --- /dev/null +++ b/heron/eco/tests/java/BUILD @@ -0,0 +1,90 @@ +test_deps_files = [ + "//third_party/java:powermock", + "//third_party/java:mockito", + "//third_party/java:junit4", +] + +heron_local_deps = [ + "//heron/eco/src/java:eco-java", + "//heron/api/src/java:api-java-low-level", + "//storm-compatibility/src/java:storm-compatibility-java", +] + +eco_test_deps = heron_local_deps + test_deps_files + +java_test( + name = "EcoBuilderTest", + srcs = glob(["com/twitter/heron/eco/builder/EcoBuilderTest.java"]), + deps = eco_test_deps, + size = "small", +) + +java_test( + name = "EcoParserTest", + srcs = glob(["com/twitter/heron/eco/parser/EcoParserTest.java"]), + deps = eco_test_deps, + size = "small" +) + +java_test( + name = "ConfigBuilderTest", + srcs = glob(["com/twitter/heron/eco/builder/ConfigBuilderTest.java"]), + deps = eco_test_deps, + size = "small" +) + +java_test( + name = "BoltBuilderTest", + srcs = glob(["com/twitter/heron/eco/builder/BoltBuilderTest.java"]), + deps = eco_test_deps, + size = "small" +) + +java_test( + name = "ComponentBuilderTest", + srcs = glob(["com/twitter/heron/eco/builder/ComponentBuilderTest.java"]), + deps = eco_test_deps, + size = "small" +) + +java_test( + name = "SpoutBuilderTest", + srcs = glob(["com/twitter/heron/eco/builder/SpoutBuilderTest.java"]), + deps = eco_test_deps, + size = "small" +) + +java_test( + name = "StreamBuilderTest", + srcs = glob(["com/twitter/heron/eco/builder/StreamBuilderTest.java"]), + deps = eco_test_deps, + size = "small" +) + +java_test( + name = "EcoTest", + srcs = glob(["com/twitter/heron/eco/EcoTest.java"]), + deps = eco_test_deps, + size = "small" +) + +java_test( + name = "ObjectBuilderTest", + srcs = glob(["com/twitter/heron/eco/builder/ObjectBuilderTest.java"]), + deps = eco_test_deps, + size = "small" +) + +java_test( + name = "BuilderUtilityTest", + srcs = glob(["com/twitter/heron/eco/builder/BuilderUtilityTest.java"]), + deps = eco_test_deps, + size = "small" +) + +java_test( + name = "EcoSubmitterTest", + srcs = glob(["com/twitter/heron/eco/submit/EcoSubmitterTest.java"]), + deps = eco_test_deps, + size = "small" +) diff --git a/heron/eco/tests/java/com/twitter/heron/eco/EcoTest.java b/heron/eco/tests/java/com/twitter/heron/eco/EcoTest.java new file mode 100644 index 00000000000..ccf97f4f328 --- /dev/null +++ b/heron/eco/tests/java/com/twitter/heron/eco/EcoTest.java @@ -0,0 +1,91 @@ +// Copyright 2018 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco; + +import java.io.FileInputStream; + + +import org.apache.storm.Config; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.topology.TopologyBuilder; +import org.junit.After; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; +import org.powermock.api.mockito.PowerMockito; + + +import com.twitter.heron.eco.builder.EcoBuilder; +import com.twitter.heron.eco.builder.ObjectBuilder; +import com.twitter.heron.eco.definition.EcoExecutionContext; +import com.twitter.heron.eco.definition.EcoTopologyDefinition; +import com.twitter.heron.eco.parser.EcoParser; +import com.twitter.heron.eco.submit.EcoSubmitter; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class EcoTest { + + @Mock + private EcoBuilder mockEcoBuilder; + @Mock + private EcoParser mockEcoParser; + @Mock + private TopologyBuilder mockTopologyBuilder; + @Mock + private EcoSubmitter mockEcoSubmitter; + @InjectMocks + private Eco subject; + + @After + public void ensureNoUnexpectedMockInteractions() { + Mockito.verifyNoMoreInteractions(mockEcoBuilder, + mockEcoParser, + mockTopologyBuilder, + mockEcoSubmitter); + } + + @Test + public void testSubmit_AllGood_BehavesAsExpected() throws Exception { + FileInputStream mockStream = PowerMockito.mock(FileInputStream.class); + + final String topologyName = "the name"; + EcoTopologyDefinition topologyDefinition = new EcoTopologyDefinition(); + topologyDefinition.setName(topologyName); + Config config = new Config(); + + when(mockEcoParser.parseFromInputStream(eq(mockStream))).thenReturn(topologyDefinition); + when(mockEcoBuilder.buildConfig(eq(topologyDefinition))).thenReturn(config); + when(mockEcoBuilder.buildTopologyBuilder(any(EcoExecutionContext.class), + any(ObjectBuilder.class))).thenReturn(mockTopologyBuilder); + + subject.submit(mockStream); + + verify(mockEcoParser).parseFromInputStream(same(mockStream)); + verify(mockEcoBuilder).buildConfig(same(topologyDefinition)); + verify(mockEcoBuilder).buildTopologyBuilder(any(EcoExecutionContext.class), + any(ObjectBuilder.class)); + verify(mockTopologyBuilder).createTopology(); + verify(mockEcoSubmitter).submitTopology(any(String.class), any(Config.class), + any(StormTopology.class)); + } +} diff --git a/heron/eco/tests/java/com/twitter/heron/eco/builder/BoltBuilderTest.java b/heron/eco/tests/java/com/twitter/heron/eco/builder/BoltBuilderTest.java new file mode 100644 index 00000000000..5ae0c9d602b --- /dev/null +++ b/heron/eco/tests/java/com/twitter/heron/eco/builder/BoltBuilderTest.java @@ -0,0 +1,89 @@ +// Copyright 2018 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.builder; + +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.List; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; + +import com.twitter.heron.eco.definition.BoltDefinition; +import com.twitter.heron.eco.definition.EcoExecutionContext; +import com.twitter.heron.eco.definition.EcoTopologyDefinition; + +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class BoltBuilderTest { + + @Mock + private EcoExecutionContext mockContext; + @Mock + private ObjectBuilder mockObjectBuilder; + + private BoltBuilder subject; + + @Before + public void setUpForEachTestCase() { + subject = new BoltBuilder(); + } + + @After + public void ensureNoUnexpectedMockInteractions() { + Mockito.verifyNoMoreInteractions(mockContext, + mockObjectBuilder); + } + + @Test + public void testBuildBolts_AllGood_BehavesAsExpected() throws ClassNotFoundException, + InvocationTargetException, NoSuchFieldException, InstantiationException, + IllegalAccessException { + EcoTopologyDefinition ecoTopologyDefinition = new EcoTopologyDefinition(); + BoltDefinition boltDefinition = new BoltDefinition(); + final String id = "id"; + boltDefinition.setId(id); + BoltDefinition boltDefinition1 = new BoltDefinition(); + final String id1 = "id1"; + boltDefinition1.setId(id1); + List boltDefinitions = new ArrayList<>(); + boltDefinitions.add(boltDefinition); + boltDefinitions.add(boltDefinition1); + ecoTopologyDefinition.setBolts(boltDefinitions); + Object object = new Object(); + Object object1 = new Object(); + + when(mockContext.getTopologyDefinition()).thenReturn(ecoTopologyDefinition); + when(mockObjectBuilder.buildObject(eq(boltDefinition), eq(mockContext))).thenReturn(object); + when(mockObjectBuilder.buildObject(eq(boltDefinition1), eq(mockContext))).thenReturn(object1); + + subject.buildBolts(mockContext, mockObjectBuilder); + + verify(mockContext).getTopologyDefinition(); + verify(mockObjectBuilder).buildObject(same(boltDefinition), same(mockContext)); + verify(mockObjectBuilder).buildObject(same(boltDefinition1), same(mockContext)); + verify(mockContext).addBolt(eq(id), anyObject()); + verify(mockContext).addBolt(eq(id1), anyObject()); + } +} diff --git a/heron/eco/tests/java/com/twitter/heron/eco/builder/BuilderUtilityTest.java b/heron/eco/tests/java/com/twitter/heron/eco/builder/BuilderUtilityTest.java new file mode 100644 index 00000000000..a92bc63da56 --- /dev/null +++ b/heron/eco/tests/java/com/twitter/heron/eco/builder/BuilderUtilityTest.java @@ -0,0 +1,139 @@ +// Copyright 2018 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.builder; + +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.storm.testing.TestWordSpout; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; + +import com.twitter.heron.eco.definition.BeanReference; +import com.twitter.heron.eco.definition.EcoExecutionContext; +import com.twitter.heron.eco.definition.ObjectDefinition; +import com.twitter.heron.eco.definition.PropertyDefinition; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.notNullValue; + +import static org.junit.Assert.assertThat; + +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class BuilderUtilityTest { + + + @Mock + private ObjectDefinition mockObjectDefinition; + @Mock + private Object mockObject; + @Mock + private EcoExecutionContext mockContext; + + private BuilderUtility subject; + + @Before + public void setUpForEachTestCase() { + subject = new BuilderUtility(); + } + + @After + public void ensureNoUnexpectedMockInteractions() { + Mockito.verifyNoMoreInteractions(mockObject, + mockObjectDefinition, + mockContext); + } + + @Test + public void toSetterName_ReturnsCorrectName() { + final String name = "name"; + final String expectedName = "setName"; + String setterName = subject.toSetterName(name); + + assertThat(setterName, is(equalTo(expectedName))); + } + + @Test + @SuppressWarnings("rawtypes") + public void classForName_ReturnsCorrectClass() throws ClassNotFoundException { + final String className = TestWordSpout.class.getName(); + + Class clazz = subject.classForName(className); + + assertThat(clazz, notNullValue()); + assertThat(className, is(equalTo(clazz.getName()))); + } + + @Test + public void applyProperties_SetterFound_BehavesAsExpected() + throws IllegalAccessException, NoSuchFieldException, + InvocationTargetException { + final String id = "id"; + final String ref = "ref"; + final String fakeComponent = "component"; + BeanReference beanReference = new BeanReference(id); + List propertyDefinitions = new ArrayList<>(); + PropertyDefinition propertyDefinition = new PropertyDefinition(); + propertyDefinition.setRef(ref); + propertyDefinition.setName(id); + propertyDefinitions.add(propertyDefinition); + + when(mockObjectDefinition.getProperties()).thenReturn(propertyDefinitions); + when(mockContext.getComponent(eq(ref))).thenReturn(fakeComponent); + + subject.applyProperties(mockObjectDefinition, beanReference, mockContext); + + verify(mockContext).getComponent(same(ref)); + verify(mockObjectDefinition).getProperties(); + } + + @Test + public void applyProperties_NoSetterFound_BehavesAsExpected() + throws IllegalAccessException, NoSuchFieldException, + InvocationTargetException { + final String ref = "ref"; + final String fakeComponent = "component"; + MockComponent mockComponent = new MockComponent(); + List propertyDefinitions = new ArrayList<>(); + PropertyDefinition propertyDefinition = new PropertyDefinition(); + propertyDefinition.setRef(ref); + propertyDefinition.setName("publicStr"); + propertyDefinitions.add(propertyDefinition); + + when(mockObjectDefinition.getProperties()).thenReturn(propertyDefinitions); + when(mockContext.getComponent(eq(ref))).thenReturn(fakeComponent); + + subject.applyProperties(mockObjectDefinition, mockComponent, mockContext); + + verify(mockContext).getComponent(same(ref)); + verify(mockObjectDefinition).getProperties(); + } + + public class MockComponent { + public String publicStr; + } + +} diff --git a/heron/eco/tests/java/com/twitter/heron/eco/builder/ComponentBuilderTest.java b/heron/eco/tests/java/com/twitter/heron/eco/builder/ComponentBuilderTest.java new file mode 100644 index 00000000000..d3d51027fa7 --- /dev/null +++ b/heron/eco/tests/java/com/twitter/heron/eco/builder/ComponentBuilderTest.java @@ -0,0 +1,89 @@ +// Copyright 2018 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.builder; + +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.List; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; + +import com.twitter.heron.eco.definition.BeanDefinition; +import com.twitter.heron.eco.definition.EcoExecutionContext; +import com.twitter.heron.eco.definition.EcoTopologyDefinition; + +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class ComponentBuilderTest { + + @Mock + private EcoExecutionContext mockContext; + @Mock + private ObjectBuilder mockObjectBuilder; + + private ComponentBuilder subject; + + @Before + public void setUpForEachTestCase() { + subject = new ComponentBuilder(); + } + + @After + public void ensureNoUnexpectedMockInteractions() { + Mockito.verifyNoMoreInteractions(mockContext, + mockObjectBuilder); + } + + @Test + public void testBuildComponents_AllGood_BehavesAsExpected() throws ClassNotFoundException, + InvocationTargetException, NoSuchFieldException, + InstantiationException, IllegalAccessException { + BeanDefinition beanDefinition = new BeanDefinition(); + final String id = "bean"; + beanDefinition.setId(id); + BeanDefinition beanDefinition1 = new BeanDefinition(); + final String id1 = "bean1"; + beanDefinition1.setId(id1); + List componentDefinitions = new ArrayList<>(); + componentDefinitions.add(beanDefinition); + componentDefinitions.add(beanDefinition1); + EcoTopologyDefinition ecoTopologyDefinition = new EcoTopologyDefinition(); + ecoTopologyDefinition.setComponents(componentDefinitions); + Object object = new Object(); + Object object1 = new Object(); + + when(mockContext.getTopologyDefinition()).thenReturn(ecoTopologyDefinition); + when(mockObjectBuilder.buildObject(eq(beanDefinition), eq(mockContext))).thenReturn(object); + when(mockObjectBuilder.buildObject(eq(beanDefinition1), eq(mockContext))).thenReturn(object1); + + subject.buildComponents(mockContext, mockObjectBuilder); + + verify(mockContext).getTopologyDefinition(); + verify(mockObjectBuilder).buildObject(same(beanDefinition), same(mockContext)); + verify(mockObjectBuilder).buildObject(same(beanDefinition1), same(mockContext)); + verify(mockContext).addComponent(eq(id), anyObject()); + verify(mockContext).addComponent(eq(id1), anyObject()); + } +} diff --git a/heron/eco/tests/java/com/twitter/heron/eco/builder/ConfigBuilderTest.java b/heron/eco/tests/java/com/twitter/heron/eco/builder/ConfigBuilderTest.java new file mode 100644 index 00000000000..e65845c2dfc --- /dev/null +++ b/heron/eco/tests/java/com/twitter/heron/eco/builder/ConfigBuilderTest.java @@ -0,0 +1,63 @@ +// Copyright 2018 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.builder; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.storm.Config; +import org.junit.Before; +import org.junit.Test; + +import com.twitter.heron.eco.definition.EcoTopologyDefinition; + +import static org.hamcrest.core.Is.is; +import static org.hamcrest.core.IsEqual.equalTo; +import static org.junit.Assert.assertThat; + +/** + * Unit tests for {@link ConfigBuilder} + */ +public class ConfigBuilderTest { + + private ConfigBuilder subject; + + @Before + public void setUpForEachTestCase() { + subject = new ConfigBuilder(); + } + + @Test + public void testBuildConfig_ConfigIsNotDefined_ReturnsEmptyConfig() { + EcoTopologyDefinition ecoTopologyDefinition = new EcoTopologyDefinition(); + + Config config = subject.buildConfig(ecoTopologyDefinition); + + assertThat(0, is(equalTo(config.size()))); + } + + @Test + public void testBuildConfig_ConfigIsDefined_ReturnsCorrectValues() { + EcoTopologyDefinition ecoTopologyDefinition = new EcoTopologyDefinition(); + Map topologyDefinitionConfig = new HashMap<>(); + topologyDefinitionConfig.put(Config.STORM_ZOOKEEPER_SERVERS, 2); + topologyDefinitionConfig.put(Config.TOPOLOGY_WORKERS, 4); + ecoTopologyDefinition.setConfig(topologyDefinitionConfig); + + Config config = subject.buildConfig(ecoTopologyDefinition); + + assertThat(config.get(Config.STORM_ZOOKEEPER_SERVERS), is(equalTo(2))); + assertThat(config.get(Config.TOPOLOGY_WORKERS), is(equalTo(4))); + } +} diff --git a/heron/eco/tests/java/com/twitter/heron/eco/builder/EcoBuilderTest.java b/heron/eco/tests/java/com/twitter/heron/eco/builder/EcoBuilderTest.java new file mode 100644 index 00000000000..1bf87b22c63 --- /dev/null +++ b/heron/eco/tests/java/com/twitter/heron/eco/builder/EcoBuilderTest.java @@ -0,0 +1,127 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.builder; + +import java.lang.reflect.InvocationTargetException; +import java.util.HashMap; +import java.util.Map; + +import org.apache.storm.Config; +import org.apache.storm.topology.TopologyBuilder; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import com.twitter.heron.eco.definition.EcoExecutionContext; +import com.twitter.heron.eco.definition.EcoTopologyDefinition; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.nullValue; +import static org.hamcrest.CoreMatchers.sameInstance; +import static org.junit.Assert.assertThat; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class EcoBuilderTest { + + @Mock + private SpoutBuilder mockSpoutBuilder; + @Mock + private BoltBuilder mockBoltBuilder; + @Mock + private StreamBuilder mockStreamBuilder; + @Mock + private ComponentBuilder mockComponentBuilder; + @Mock + private ConfigBuilder mockConfigBuilder; + @InjectMocks + private EcoBuilder subject; + + private Map configMap; + + private EcoTopologyDefinition ecoTopologyDefinition; + + @Before + public void setUpForEachTestCase() { + configMap = new HashMap<>(); + ecoTopologyDefinition = new EcoTopologyDefinition(); + ecoTopologyDefinition.setConfig(configMap); + } + + @After + public void ensureNoUnexpectedMockInteractions() { + verifyNoMoreInteractions(mockSpoutBuilder, + mockBoltBuilder, + mockStreamBuilder, + mockComponentBuilder, + mockConfigBuilder); + } + + @Test + public void testBuild_EmptyConfigMap_ReturnsDefaultConfigs() { + + Config config = new Config(); + when(mockConfigBuilder.buildConfig(eq(ecoTopologyDefinition))).thenReturn(config); + + Config returnedConfig = subject.buildConfig(ecoTopologyDefinition); + + verify(mockConfigBuilder).buildConfig(same(ecoTopologyDefinition)); + + assertThat(returnedConfig.get(Config.TOPOLOGY_DEBUG), is(nullValue())); + assertThat(config, sameInstance(returnedConfig)); + } + + @Test + public void testBuild_CustomConfigMap_ReturnsCorrectConfigs() { + configMap.put(Config.TOPOLOGY_DEBUG, false); + final String environment = "dev"; + final int spouts = 3; + configMap.put(Config.TOPOLOGY_ENVIRONMENT, environment); + configMap.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, spouts); + + Config config = new Config(); + + when(mockConfigBuilder.buildConfig(eq(ecoTopologyDefinition))).thenReturn(config); + + assertThat(subject.buildConfig(ecoTopologyDefinition), sameInstance(config)); + + verify(mockConfigBuilder).buildConfig(same(ecoTopologyDefinition)); + } + + @Test + public void testBuildTopologyBuilder_BuildsAsExpected() + throws IllegalAccessException, ClassNotFoundException, + InstantiationException, NoSuchFieldException, InvocationTargetException { + Config config = new Config(); + EcoExecutionContext context = new EcoExecutionContext(ecoTopologyDefinition, config); + ObjectBuilder objectBuilder = new ObjectBuilder(); + subject.buildTopologyBuilder(context, objectBuilder); + + verify(mockSpoutBuilder).buildSpouts(same(context), + any(TopologyBuilder.class), same(objectBuilder)); + verify(mockBoltBuilder).buildBolts(same(context), same(objectBuilder)); + verify(mockStreamBuilder).buildStreams(same(context), any(TopologyBuilder.class), + same(objectBuilder)); + verify(mockComponentBuilder).buildComponents(same(context), same(objectBuilder)); + } +} diff --git a/heron/eco/tests/java/com/twitter/heron/eco/builder/ObjectBuilderTest.java b/heron/eco/tests/java/com/twitter/heron/eco/builder/ObjectBuilderTest.java new file mode 100644 index 00000000000..d1ee8782e95 --- /dev/null +++ b/heron/eco/tests/java/com/twitter/heron/eco/builder/ObjectBuilderTest.java @@ -0,0 +1,159 @@ +// Copyright 2018 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.builder; + +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.storm.testing.FixedTuple; +import org.apache.storm.testing.TestWordSpout; +import org.junit.After; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; + +import com.twitter.heron.eco.definition.ConfigurationMethodDefinition; +import com.twitter.heron.eco.definition.EcoExecutionContext; +import com.twitter.heron.eco.definition.ObjectDefinition; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertThat; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyListOf; +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + + +@RunWith(MockitoJUnitRunner.class) +@SuppressWarnings({"rawtypes", "unchecked"}) +public class ObjectBuilderTest { + + @Mock + private ObjectDefinition mockObjectDefinition; + @Mock + private EcoExecutionContext mockContext; + @Mock + private BuilderUtility mockBuilderUtility; + @Mock + private ConfigurationMethodDefinition mockMethodDefinition; + @InjectMocks + private ObjectBuilder subject; + + @After + public void ensureNoUnexpectedMockInteractions() { + Mockito.verifyNoMoreInteractions(mockContext, + mockObjectDefinition, + mockBuilderUtility, + mockMethodDefinition); + } + + @Test + public void buildObject_WithArgsBeanReferenceAndOther_BehavesAsExpected() + throws ClassNotFoundException, + InvocationTargetException, NoSuchFieldException, + InstantiationException, IllegalAccessException { + final String beanReference1 = "bean1"; + List constructorArgs = new ArrayList<>(); + List objects = new ArrayList<>(); + List firstObject = new ArrayList<>(); + objects.add(firstObject); + constructorArgs.add(objects); + Object someComponent = new Object(); + final String className = FixedTuple.class.getName(); + final Class testClass = FixedTuple.class; + final String methodName = "toString"; + List methodDefinitions = new ArrayList<>(); + + methodDefinitions.add(mockMethodDefinition); + + when(mockObjectDefinition.getClassName()).thenReturn(className); + when(mockObjectDefinition.hasConstructorArgs()).thenReturn(true); + when(mockObjectDefinition.getConstructorArgs()).thenReturn(constructorArgs); + when(mockObjectDefinition.hasReferences()).thenReturn(true); + when(mockContext.getComponent(eq(beanReference1))).thenReturn(someComponent); + when(mockBuilderUtility.resolveReferences(eq(constructorArgs), eq(mockContext))) + .thenCallRealMethod(); + when(mockBuilderUtility.classForName(eq(className))).thenReturn(testClass); + when(mockObjectDefinition.getConfigMethods()).thenReturn(methodDefinitions); + when(mockMethodDefinition.hasReferences()).thenReturn(true); + when(mockMethodDefinition.getArgs()).thenReturn(null); + when(mockMethodDefinition.getName()).thenReturn(methodName); + + Object object = subject.buildObject(mockObjectDefinition, mockContext); + + verify(mockObjectDefinition).getClassName(); + verify(mockObjectDefinition).hasConstructorArgs(); + verify(mockObjectDefinition).getConstructorArgs(); + verify(mockObjectDefinition).hasReferences(); + verify(mockBuilderUtility).classForName(same(className)); + verify(mockBuilderUtility).resolveReferences(same(constructorArgs), same(mockContext)); + verify(mockBuilderUtility).applyProperties(eq(mockObjectDefinition), any(Object.class), + same(mockContext)); + verify(mockObjectDefinition).getConfigMethods(); + verify(mockMethodDefinition).hasReferences(); + verify(mockMethodDefinition).getArgs(); + verify(mockBuilderUtility, times(2)).resolveReferences(anyListOf(Object.class), + same(mockContext)); + verify(mockMethodDefinition).getName(); + + assertThat(object, is(instanceOf(FixedTuple.class))); + FixedTuple fixedTuple = (FixedTuple) object; + assertThat(fixedTuple.values, is(equalTo(objects))); + assertThat(fixedTuple.values.get(0), is(equalTo(firstObject))); + } + + @Test + public void buildObject_NoArgs_BehavesAsExpected() + throws ClassNotFoundException, InvocationTargetException, + NoSuchFieldException, InstantiationException, IllegalAccessException { + + final Class fixedTupleClass = TestWordSpout.class; + final String className = TestWordSpout.class.getName(); + List methodDefinitions = new ArrayList<>(); + final String methodName = "close"; + + methodDefinitions.add(mockMethodDefinition); + + when(mockObjectDefinition.getClassName()).thenReturn(className); + when(mockObjectDefinition.hasConstructorArgs()).thenReturn(false); + when(mockBuilderUtility.classForName(eq(className))).thenReturn(fixedTupleClass); + when(mockObjectDefinition.getConfigMethods()).thenReturn(methodDefinitions); + when(mockMethodDefinition.hasReferences()).thenReturn(false); + when(mockMethodDefinition.getName()).thenReturn(methodName); + + subject.buildObject(mockObjectDefinition, mockContext); + + verify(mockObjectDefinition).getClassName(); + verify(mockObjectDefinition).hasConstructorArgs(); + verify(mockBuilderUtility).classForName(same(className)); + verify(mockBuilderUtility).applyProperties(same(mockObjectDefinition), + anyObject(), same(mockContext)); + verify(mockObjectDefinition).getConfigMethods(); + verify(mockMethodDefinition).hasReferences(); + verify(mockMethodDefinition).getName(); + verify(mockMethodDefinition).getArgs(); + } +} + diff --git a/heron/eco/tests/java/com/twitter/heron/eco/builder/SpoutBuilderTest.java b/heron/eco/tests/java/com/twitter/heron/eco/builder/SpoutBuilderTest.java new file mode 100644 index 00000000000..12abe933091 --- /dev/null +++ b/heron/eco/tests/java/com/twitter/heron/eco/builder/SpoutBuilderTest.java @@ -0,0 +1,159 @@ +// Copyright 2018 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.builder; + +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; + +import com.twitter.heron.eco.definition.EcoExecutionContext; +import com.twitter.heron.eco.definition.EcoTopologyDefinition; +import com.twitter.heron.eco.definition.SpoutDefinition; + +import static org.mockito.Matchers.anyObject; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class SpoutBuilderTest { + + @Mock + private EcoExecutionContext mockContext; + @Mock + private TopologyBuilder mockTopologyBuilder; + @Mock + private ObjectBuilder mockObjectBuilder; + + private SpoutBuilder subject; + + @Before + public void setUpForEachTestCase() { + subject = new SpoutBuilder(); + } + + @After + public void ensureNoUnexpectedMockInteractions() { + Mockito.verifyNoMoreInteractions(mockContext, + mockTopologyBuilder, + mockObjectBuilder); + } + + @Test + public void testBuildSpouts_AllGood_BehavesAsExpected() throws ClassNotFoundException, + InvocationTargetException, NoSuchFieldException, + InstantiationException, IllegalAccessException { + EcoTopologyDefinition topologyDefinition = new EcoTopologyDefinition(); + + SpoutDefinition spoutDefinition = new SpoutDefinition(); + final String id = "id"; + final int parallelism = 2; + spoutDefinition.setId(id); + spoutDefinition.setParallelism(parallelism); + SpoutDefinition spoutDefinition1 = new SpoutDefinition(); + final String id1 = "id1"; + final int parallelism1 = 3; + spoutDefinition1.setId(id1); + spoutDefinition1.setParallelism(parallelism1); + List spoutDefinitions = new ArrayList<>(); + spoutDefinitions.add(spoutDefinition); + spoutDefinitions.add(spoutDefinition1); + topologyDefinition.setSpouts(spoutDefinitions); + MockSpout mockSpout = new MockSpout(); + MockSpout mockSpout1 = new MockSpout(); + + when(mockObjectBuilder.buildObject(eq(spoutDefinition), + eq(mockContext))).thenReturn(mockSpout); + when(mockObjectBuilder.buildObject(eq(spoutDefinition1), + eq(mockContext))).thenReturn(mockSpout1); + when(mockContext.getTopologyDefinition()).thenReturn(topologyDefinition); + + subject.buildSpouts(mockContext, mockTopologyBuilder, mockObjectBuilder); + + verify(mockContext).getTopologyDefinition(); + verify(mockObjectBuilder).buildObject(same(spoutDefinition), same(mockContext)); + verify(mockObjectBuilder).buildObject(same(spoutDefinition1), same(mockContext)); + verify(mockTopologyBuilder).setSpout(eq(id), eq(mockSpout), eq(parallelism)); + verify(mockTopologyBuilder).setSpout(eq(id1), eq(mockSpout1), eq(parallelism1)); + verify(mockContext).addSpout(eq(id), anyObject()); + verify(mockContext).addSpout(eq(id1), anyObject()); + } + + @SuppressWarnings("serial") + private class MockSpout implements IRichSpout { + + @Override + public void open(Map conf, + TopologyContext context, SpoutOutputCollector collector) { + + } + + @Override + public void close() { + + } + + @Override + public void activate() { + + } + + @Override + public void deactivate() { + + } + + @Override + public void nextTuple() { + + } + + @Override + public void ack(Object msgId) { + + } + + @Override + public void fail(Object msgId) { + + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } + + @Override + public Map getComponentConfiguration() { + return null; + } + } + + +} diff --git a/heron/eco/tests/java/com/twitter/heron/eco/builder/StreamBuilderTest.java b/heron/eco/tests/java/com/twitter/heron/eco/builder/StreamBuilderTest.java new file mode 100644 index 00000000000..9c9b868f731 --- /dev/null +++ b/heron/eco/tests/java/com/twitter/heron/eco/builder/StreamBuilderTest.java @@ -0,0 +1,370 @@ +// Copyright 2018 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.builder; + +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.grouping.CustomStreamGrouping; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.task.WorkerTopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.BoltDeclarer; +import org.apache.storm.topology.IBasicBolt; +import org.apache.storm.topology.IRichBolt; +import org.apache.storm.topology.IWindowedBolt; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.windowing.TimestampExtractor; +import org.apache.storm.windowing.TupleWindow; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.runners.MockitoJUnitRunner; + +import com.twitter.heron.eco.definition.EcoExecutionContext; +import com.twitter.heron.eco.definition.EcoTopologyDefinition; +import com.twitter.heron.eco.definition.GroupingDefinition; +import com.twitter.heron.eco.definition.ObjectDefinition; +import com.twitter.heron.eco.definition.StreamDefinition; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyMap; +import static org.mockito.Matchers.eq; +import static org.mockito.Matchers.same; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class StreamBuilderTest { + + @Mock + private EcoTopologyDefinition mockDefinition; + @Mock + private EcoExecutionContext mockContext; + @Mock + private TopologyBuilder mockTopologyBuilder; + @Mock + private ObjectBuilder mockObjectBuilder; + @Mock + private BoltDeclarer mockBoltDeclarer; + + private StreamBuilder subject; + + @Before + public void setUpForEachTestCase() { + subject = new StreamBuilder(); + } + + @After + public void ensureNoUnexpectedMockInteractions() { + Mockito.verifyNoMoreInteractions(mockDefinition, + mockContext, + mockTopologyBuilder, + mockObjectBuilder, + mockBoltDeclarer); + } + + @Test + @SuppressWarnings("unchecked") + public void buildStreams_SpoutToIRichBolt_ShuffleGrouping() throws ClassNotFoundException, + InvocationTargetException, NoSuchFieldException, + InstantiationException, IllegalAccessException { + final int iRichBoltParallelism = 1; + final String to = "to"; + final String from = "from"; + final String streamId = "id"; + StreamDefinition streamDefinition = new StreamDefinition(); + streamDefinition.setFrom(from); + streamDefinition.setTo(to); + streamDefinition.setId(streamId); + List streams = new ArrayList<>(); + streams.add(streamDefinition); + GroupingDefinition groupingDefinition = new GroupingDefinition(); + groupingDefinition.setType(GroupingDefinition.Type.SHUFFLE); + groupingDefinition.setStreamId(streamId); + streamDefinition.setGrouping(groupingDefinition); + MockIRichBolt mockIRichBolt = new MockIRichBolt(); + + when(mockContext.getTopologyDefinition()).thenReturn(mockDefinition); + when(mockContext.getBolt(eq(to))).thenReturn(mockIRichBolt); + when(mockDefinition.getStreams()).thenReturn(streams); + when(mockDefinition.parallelismForBolt(eq(to))).thenReturn(iRichBoltParallelism); + when(mockTopologyBuilder.setBolt(eq(to), + eq(mockIRichBolt), eq(iRichBoltParallelism))).thenReturn(mockBoltDeclarer); + + subject.buildStreams(mockContext, mockTopologyBuilder, mockObjectBuilder); + + verify(mockContext).getTopologyDefinition(); + verify(mockContext).getBolt(eq(to)); + verify(mockDefinition).parallelismForBolt(eq(to)); + verify(mockTopologyBuilder).setBolt(eq(to), eq(mockIRichBolt), eq(iRichBoltParallelism)); + verify(mockBoltDeclarer).shuffleGrouping(eq(from), eq(streamId)); + verify(mockContext).setStreams(anyMap()); + verify(mockDefinition).getStreams(); + } + + @Test + @SuppressWarnings("unchecked") + public void buildStreams_SpoutToIBasicBolt_FieldsGroupingWithArgs() throws + ClassNotFoundException, + InvocationTargetException, NoSuchFieldException, + InstantiationException, IllegalAccessException { + final int iRichBoltParallelism = 1; + final String to = "to"; + final String from = "from"; + final String streamId = "id"; + StreamDefinition streamDefinition = new StreamDefinition(); + streamDefinition.setFrom(from); + streamDefinition.setTo(to); + streamDefinition.setId(streamId); + List streams = new ArrayList<>(); + streams.add(streamDefinition); + GroupingDefinition groupingDefinition = new GroupingDefinition(); + groupingDefinition.setType(GroupingDefinition.Type.FIELDS); + List args = new ArrayList<>(); + args.add("arg1"); + groupingDefinition.setArgs(args); + groupingDefinition.setStreamId(streamId); + streamDefinition.setGrouping(groupingDefinition); + MockIBasicBolt mockIBasicBolt = new MockIBasicBolt(); + + when(mockContext.getTopologyDefinition()).thenReturn(mockDefinition); + when(mockContext.getBolt(eq(to))).thenReturn(mockIBasicBolt); + when(mockDefinition.getStreams()).thenReturn(streams); + when(mockDefinition.parallelismForBolt(eq(to))).thenReturn(iRichBoltParallelism); + when(mockTopologyBuilder.setBolt(eq(to), + eq(mockIBasicBolt), eq(iRichBoltParallelism))).thenReturn(mockBoltDeclarer); + + subject.buildStreams(mockContext, mockTopologyBuilder, mockObjectBuilder); + + verify(mockContext).getTopologyDefinition(); + verify(mockContext).getBolt(eq(to)); + verify(mockDefinition).parallelismForBolt(eq(to)); + verify(mockTopologyBuilder).setBolt(eq(to), eq(mockIBasicBolt), eq(iRichBoltParallelism)); + verify(mockBoltDeclarer).fieldsGrouping(eq(from), eq(streamId), any(Fields.class)); + verify(mockContext).setStreams(anyMap()); + verify(mockDefinition).getStreams(); + } + + @Test(expected = IllegalArgumentException.class) + @SuppressWarnings("unchecked") + public void buildStreams_SpoutToIBasicBolt_FieldsGroupingWithoutArgs_ExceptionThrown() throws + ClassNotFoundException, + InvocationTargetException, NoSuchFieldException, + InstantiationException, IllegalAccessException { + final int iRichBoltParallelism = 1; + final String to = "to"; + final String from = "from"; + final String streamId = "id"; + StreamDefinition streamDefinition = new StreamDefinition(); + streamDefinition.setFrom(from); + streamDefinition.setTo(to); + streamDefinition.setId(streamId); + List streams = new ArrayList<>(); + streams.add(streamDefinition); + GroupingDefinition groupingDefinition = new GroupingDefinition(); + groupingDefinition.setType(GroupingDefinition.Type.FIELDS); + + groupingDefinition.setStreamId(streamId); + streamDefinition.setGrouping(groupingDefinition); + MockIBasicBolt mockIBasicBolt = new MockIBasicBolt(); + try { + when(mockContext.getTopologyDefinition()).thenReturn(mockDefinition); + when(mockContext.getBolt(eq(to))).thenReturn(mockIBasicBolt); + when(mockDefinition.getStreams()).thenReturn(streams); + when(mockDefinition.parallelismForBolt(eq(to))).thenReturn(iRichBoltParallelism); + when(mockTopologyBuilder.setBolt(eq(to), + eq(mockIBasicBolt), eq(iRichBoltParallelism))).thenReturn(mockBoltDeclarer); + + subject.buildStreams(mockContext, mockTopologyBuilder, mockObjectBuilder); + } finally { + + verify(mockContext).getTopologyDefinition(); + verify(mockContext).getBolt(eq(to)); + verify(mockDefinition).parallelismForBolt(eq(to)); + verify(mockTopologyBuilder).setBolt(eq(to), eq(mockIBasicBolt), eq(iRichBoltParallelism)); + verify(mockDefinition).getStreams(); + } + } + + @Test + @SuppressWarnings("unchecked") + public void buildStreams_SpoutToIWindowedBolt_CustomGrouping() throws ClassNotFoundException, + InvocationTargetException, NoSuchFieldException, + InstantiationException, IllegalAccessException { + final int iRichBoltParallelism = 1; + final String to = "to"; + final String from = "from"; + final String streamId = "id"; + StreamDefinition streamDefinition = new StreamDefinition(); + streamDefinition.setFrom(from); + streamDefinition.setTo(to); + streamDefinition.setId(streamId); + List streams = new ArrayList<>(); + streams.add(streamDefinition); + GroupingDefinition groupingDefinition = new GroupingDefinition(); + groupingDefinition.setType(GroupingDefinition.Type.CUSTOM); + MockCustomObjectDefinition mockCustomObjectDefinition = new MockCustomObjectDefinition(); + + groupingDefinition.setCustomClass(mockCustomObjectDefinition); + List args = new ArrayList<>(); + args.add("arg1"); + groupingDefinition.setArgs(args); + groupingDefinition.setStreamId(streamId); + streamDefinition.setGrouping(groupingDefinition); + MockIWindowedBolt mockIWindowedBolt = new MockIWindowedBolt(); + MockCustomStreamGrouping mockCustomStreamGrouping = new MockCustomStreamGrouping(); + + when(mockContext.getTopologyDefinition()).thenReturn(mockDefinition); + when(mockContext.getBolt(eq(to))).thenReturn(mockIWindowedBolt); + when(mockDefinition.getStreams()).thenReturn(streams); + when(mockDefinition.parallelismForBolt(eq(to))).thenReturn(iRichBoltParallelism); + when(mockTopologyBuilder.setBolt(eq(to), + eq(mockIWindowedBolt), eq(iRichBoltParallelism))).thenReturn(mockBoltDeclarer); + when(mockObjectBuilder.buildObject(eq(mockCustomObjectDefinition), + eq(mockContext))).thenReturn(mockCustomStreamGrouping); + + subject.buildStreams(mockContext, mockTopologyBuilder, mockObjectBuilder); + + verify(mockContext).getTopologyDefinition(); + verify(mockContext).getBolt(eq(to)); + verify(mockDefinition).parallelismForBolt(eq(to)); + verify(mockTopologyBuilder).setBolt(eq(to), eq(mockIWindowedBolt), eq(iRichBoltParallelism)); + verify(mockBoltDeclarer).customGrouping(eq(from), eq(streamId), eq(mockCustomStreamGrouping)); + verify(mockContext).setStreams(anyMap()); + verify(mockDefinition).getStreams(); + verify(mockObjectBuilder).buildObject(same(mockCustomObjectDefinition), same(mockContext)); + } + + private class MockCustomObjectDefinition extends ObjectDefinition { + + } + + @SuppressWarnings("serial") + private class MockCustomStreamGrouping implements CustomStreamGrouping { + + @Override + public void prepare(WorkerTopologyContext context, GlobalStreamId stream, + List targetTasks) { + + } + + @Override + public List chooseTasks(int taskId, List values) { + return null; + } + } + + @SuppressWarnings({"rawtypes", "unchecked", "serial"}) + private class MockIRichBolt implements IRichBolt { + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + + } + + @Override + public void execute(Tuple input) { + + } + + @Override + public void cleanup() { + + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } + + @Override + public Map getComponentConfiguration() { + return null; + } + } + + @SuppressWarnings({"rawtypes", "unchecked", "serial"}) + private class MockIWindowedBolt implements IWindowedBolt { + @Override + public void prepare(Map topoConf, + TopologyContext context, OutputCollector collector) { + + } + + @Override + public void execute(TupleWindow inputWindow) { + + } + + @Override + public void cleanup() { + + } + + @Override + public TimestampExtractor getTimestampExtractor() { + return null; + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } + + @Override + public Map getComponentConfiguration() { + return null; + } + } + + + @SuppressWarnings({"rawtypes", "unchecked", "serial"}) + public class MockIBasicBolt implements IBasicBolt { + @Override + public void prepare(Map stormConf, TopologyContext context) { + + } + + @Override + public void execute(Tuple input, BasicOutputCollector collector) { + + } + + @Override + public void cleanup() { + + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + + } + + @Override + public Map getComponentConfiguration() { + return null; + } + } +} diff --git a/heron/eco/tests/java/com/twitter/heron/eco/parser/EcoParserTest.java b/heron/eco/tests/java/com/twitter/heron/eco/parser/EcoParserTest.java new file mode 100644 index 00000000000..24d226f3c51 --- /dev/null +++ b/heron/eco/tests/java/com/twitter/heron/eco/parser/EcoParserTest.java @@ -0,0 +1,407 @@ +// Copyright 2017 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.parser; + +import java.io.ByteArrayInputStream; +import java.io.InputStream; +import java.util.List; + +import org.junit.Before; +import org.junit.Test; + +import com.twitter.heron.eco.definition.BeanDefinition; +import com.twitter.heron.eco.definition.BeanReference; +import com.twitter.heron.eco.definition.BoltDefinition; +import com.twitter.heron.eco.definition.EcoTopologyDefinition; +import com.twitter.heron.eco.definition.GroupingDefinition; +import com.twitter.heron.eco.definition.PropertyDefinition; +import com.twitter.heron.eco.definition.StreamDefinition; + +import static junit.framework.TestCase.assertNotNull; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + + +/** + * Unit tests for {@link EcoParser} + */ +public class EcoParserTest { + + + private static final String BOLT_1 = "bolt-1"; + private static final String BOLT_2 = "bolt-2"; + private static final String YAML_NO_CONFIG_STR = "# Licensed to the Apache Software Foundation" + + " (ASF) under one\n" + + "# or more contributor license agreements. See the NOTICE file\n" + + "# distributed with this work for additional information\n" + + "# regarding copyright ownership. The ASF licenses this file\n" + + "# to you under the Apache License, Version 2.0 (the\n" + + "# \"License\"); you may not use this file except in compliance\n" + + "# with the License. You may obtain a copy of the License at\n" + + "#\n" + + "# http://www.apache.org/licenses/LICENSE-2.0\n" + + "#\n" + + "# Unless required by applicable law or agreed to in writing, software\n" + + "# distributed under the License is distributed on an \"AS IS\" BASIS,\n" + + "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" + + "# See the License for the specific language governing permissions and\n" + + "# limitations under the License.\n" + + "\n" + + "---\n" + + "\n" + + "# topology definition\n" + + "# name to be used when submitting\n" + + "name: \"yaml-topology\"\n" + + "\n" + + "# topology configuration\n" + + "# this will be passed to the submitter as a map of config options\n" + + "#\n" + + "# spout definitions\n" + + "spouts:\n" + + " - id: \"spout-1\"\n" + + " className: \"com.twitter.heron.sample.TestWordSpout\"\n" + + " parallelism: 1\n" + + "\n" + + "# bolt definitions\n" + + "bolts:\n" + + " - id: \"bolt-1\"\n" + + " className: \"com.twitter.heron.sample.TestWordCounter\"\n" + + " parallelism: 2\n" + + "\n" + + " - id: \"bolt-2\"\n" + + " className: \"com.twitter.heron.sample.LogInfoBolt\"\n" + + " parallelism: 1\n" + + "\n" + + "#stream definitions\n" + + "# stream definitions define connections between spouts and bolts.\n" + + "# note that such connections can be cyclical\n" + + "streams:\n" + + " - name: \"spout-1 --> bolt-1\" # name isn't used (placeholder for logging, UI, etc.)\n" + + " id: \"connection-1\"\n" + + " from: \"spout-1\"\n" + + " to: \"bolt-1\"\n" + + " grouping:\n" + + " type: FIELDS\n" + + " args: [\"word\"]\n" + + "\n" + + " - name: \"bolt-1 --> bolt2\"\n" + + " id: \"connection-2\"\n" + + " from: \"bolt-1\"\n" + + " to: \"bolt-2\"\n" + + " grouping:\n" + + " type: SHUFFLE"; + private static final String YAML_STR = "# Licensed to the Apache Software Foundation" + + " (ASF) under one\n" + + "# or more contributor license agreements. See the NOTICE file\n" + + "# distributed with this work for additional information\n" + + "# regarding copyright ownership. The ASF licenses this file\n" + + "# to you under the Apache License, Version 2.0 (the\n" + + "# \"License\"); you may not use this file except in compliance\n" + + "# with the License. You may obtain a copy of the License at\n" + + "#\n" + + "# http://www.apache.org/licenses/LICENSE-2.0\n" + + "#\n" + + "# Unless required by applicable law or agreed to in writing, software\n" + + "# distributed under the License is distributed on an \"AS IS\" BASIS,\n" + + "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n" + + "# See the License for the specific language governing permissions and\n" + + "# limitations under the License.\n" + + "\n" + + "---\n" + + "\n" + + "# topology definition\n" + + "# name to be used when submitting\n" + + "name: \"yaml-topology\"\n" + + "\n" + + "# topology configuration\n" + + "# this will be passed to the submitter as a map of config options\n" + + "#\n" + + "config:\n" + + " topology.workers: 1\n" + + "\n" + + "# spout definitions\n" + + "spouts:\n" + + " - id: \"spout-1\"\n" + + " className: \"com.twitter.heron.sample.TestWordSpout\"\n" + + " parallelism: 1\n" + + "\n" + + "# bolt definitions\n" + + "bolts:\n" + + " - id: \"bolt-1\"\n" + + " className: \"com.twitter.heron.sample.TestWordCounter\"\n" + + " parallelism: 2\n" + + "\n" + + " - id: \"bolt-2\"\n" + + " className: \"com.twitter.heron.sample.LogInfoBolt\"\n" + + " parallelism: 1\n" + + "\n" + + "#stream definitions\n" + + "# stream definitions define connections between spouts and bolts.\n" + + "# note that such connections can be cyclical\n" + + "streams:\n" + + " - name: \"spout-1 --> bolt-1\" # name isn't used (placeholder for logging, UI, etc.)\n" + + " id: \"connection-1\"\n" + + " from: \"spout-1\"\n" + + " to: \"bolt-1\"\n" + + " grouping:\n" + + " type: FIELDS\n" + + " args: [\"word\"]\n" + + "\n" + + " - name: \"bolt-1 --> bolt2\"\n" + + " id: \"connection-2\"\n" + + " from: \"bolt-1\"\n" + + " to: \"bolt-2\"\n" + + " grouping:\n" + + " type: SHUFFLE"; + + private static final String YAML_STR_1 = "# Test ability to wire together shell spouts/bolts\n" + + "---\n" + + "\n" + + "name: \"kafka-topology\"\n" + + "\n" + + "# Components\n" + + "# Components are analagous to Spring beans. They are meant to be used as constructor,\n" + + "# property(setter), and builder arguments.\n" + + "#\n" + + "# for the time being, components must be declared in the order they are referenced\n" + + "components:\n" + + " - id: \"stringScheme\"\n" + + " className: \"org.apache.storm.kafka.StringScheme\"\n" + + "\n" + + " - id: \"stringMultiScheme\"\n" + + " className: \"org.apache.storm.spout.SchemeAsMultiScheme\"\n" + + " constructorArgs:\n" + + " - ref: \"stringScheme\"\n" + + "\n" + + " - id: \"zkHosts\"\n" + + " className: \"org.apache.storm.kafka.ZkHosts\"\n" + + " constructorArgs:\n" + + " - \"localhost:2181\"\n" + + "\n" + + "# Alternative kafka config\n" + + "# - id: \"kafkaConfig\"\n" + + "# className: \"org.apache.storm.kafka.KafkaConfig\"\n" + + "# constructorArgs:\n" + + "# # brokerHosts\n" + + "# - ref: \"zkHosts\"\n" + + "# # topic\n" + + "# - \"myKafkaTopic\"\n" + + "# # clientId (optional)\n" + + "# - \"myKafkaClientId\"\n" + + "\n" + + " - id: \"spoutConfig\"\n" + + " className: \"org.apache.storm.kafka.SpoutConfig\"\n" + + " constructorArgs:\n" + + " # brokerHosts\n" + + " - ref: \"zkHosts\"\n" + + " # topic\n" + + " - \"myKafkaTopic\"\n" + + " # zkRoot\n" + + " - \"/kafkaSpout\"\n" + + " # id\n" + + " - \"myId\"\n" + + " properties:\n" + + " - name: \"ignoreZkOffsets\"\n" + + " value: true\n" + + " - name: \"scheme\"\n" + + " ref: \"stringMultiScheme\"\n" + + "\n" + + "\n" + + "\n" + + "# topology configuration\n" + + "# this will be passed to the submitter as a map of config options\n" + + "#\n" + + "config:\n" + + " topology.workers: 1\n" + + " # ...\n" + + "\n" + + "# spout definitions\n" + + "spouts:\n" + + " - id: \"kafka-spout\"\n" + + " className: \"org.apache.storm.kafka.KafkaSpout\"\n" + + " constructorArgs:\n" + + " - ref: \"spoutConfig\"\n" + + "\n" + + "# bolt definitions\n" + + "bolts:\n" + + " - id: \"splitsentence\"\n" + + " className: \"org.apache.storm.flux.wrappers.bolts.FluxShellBolt\"\n" + + " constructorArgs:\n" + + " # command line\n" + + " - [\"python\", \"splitsentence.py\"]\n" + + " # output fields\n" + + " - [\"word\"]\n" + + " parallelism: 1\n" + + " # ...\n" + + "\n" + + " - id: \"log\"\n" + + " className: \"org.apache.storm.flux.wrappers.bolts.LogInfoBolt\"\n" + + " parallelism: 1\n" + + " # ...\n" + + "\n" + + " - id: \"count\"\n" + + " className: \"org.apache.storm.testing.TestWordCounter\"\n" + + " parallelism: 1\n" + + " # ...\n" + + "\n" + + "#stream definitions\n" + + "# stream definitions define connections between spouts and bolts.\n" + + "# note that such connections can be cyclical\n" + + "# custom stream groupings are also supported\n" + + "\n" + + "streams:\n" + + " - name: \"kafka --> split\" # name isn't used (placeholder for logging, UI, etc.)\n" + + " id: \"stream1\"\n" + + " from: \"kafka-spout\"\n" + + " to: \"splitsentence\"\n" + + " grouping:\n" + + " type: SHUFFLE\n" + + "\n" + + " - name: \"split --> count\"\n" + + " id: \"stream2\"\n" + + " from: \"splitsentence\"\n" + + " to: \"count\"\n" + + " grouping:\n" + + " type: FIELDS\n" + + " args: [\"word\"]\n" + + "\n" + + " - name: \"count --> log\"\n" + + " id: \"stream3\"\n" + + " from: \"count\"\n" + + " to: \"log\"\n" + + " grouping:\n" + + " type: SHUFFLE"; + private EcoParser subject; + + @Before + public void setUpBeforeEachTestCase() { + subject = new EcoParser(); + } + + + @Test + public void testParseFromInputStream_VerifyComponents_MapsAsExpected() throws Exception { + + InputStream inputStream = new ByteArrayInputStream(YAML_STR_1.getBytes()); + + EcoTopologyDefinition topologyDefinition = subject.parseFromInputStream(inputStream); + List components = topologyDefinition.getComponents(); + + assertEquals("kafka-topology", topologyDefinition.getName()); + assertEquals(4, components.size()); + + BeanDefinition stringSchemeComponent = components.get(0); + assertEquals("stringScheme", stringSchemeComponent.getId()); + assertEquals("org.apache.storm.kafka.StringScheme", stringSchemeComponent.getClassName()); + + + BeanDefinition stringMultiSchemeComponent = components.get(1); + assertEquals("stringMultiScheme", stringMultiSchemeComponent.getId()); + assertEquals("org.apache.storm.spout.SchemeAsMultiScheme", + stringMultiSchemeComponent.getClassName()); + assertEquals(1, stringMultiSchemeComponent.getConstructorArgs().size()); + BeanReference multiStringReference = + (BeanReference) stringMultiSchemeComponent.getConstructorArgs().get(0); + assertEquals("stringScheme", multiStringReference.getId()); + + BeanDefinition zkHostsComponent = components.get(2); + assertEquals("zkHosts", zkHostsComponent.getId()); + assertEquals("org.apache.storm.kafka.ZkHosts", zkHostsComponent.getClassName()); + assertEquals(1, zkHostsComponent.getConstructorArgs().size()); + assertEquals("localhost:2181", zkHostsComponent.getConstructorArgs().get(0)); + + BeanDefinition spoutConfigComponent = components.get(3); + List spoutConstructArgs = spoutConfigComponent.getConstructorArgs(); + assertEquals("spoutConfig", spoutConfigComponent.getId()); + assertEquals("org.apache.storm.kafka.SpoutConfig", spoutConfigComponent.getClassName()); + BeanReference spoutBrokerHostComponent = (BeanReference) spoutConstructArgs.get(0); + assertEquals("zkHosts", spoutBrokerHostComponent.getId()); + assertEquals("myKafkaTopic", spoutConstructArgs.get(1)); + assertEquals("/kafkaSpout", spoutConstructArgs.get(2)); + List properties = spoutConfigComponent.getProperties(); + assertEquals("ignoreZkOffsets", properties.get(0).getName()); + assertEquals(true, properties.get(0).getValue()); + assertEquals("scheme", properties.get(1).getName()); + assertEquals(true, properties.get(1).isReference()); + assertEquals("stringMultiScheme", properties.get(1).getRef()); + } + + @Test + public void testParseFromInputStream_VerifyAllButComponents_MapsAsExpected() throws Exception { + + InputStream inputStream = new ByteArrayInputStream(YAML_STR.getBytes()); + + EcoTopologyDefinition topologyDefinition = subject.parseFromInputStream(inputStream); + + assertEquals("yaml-topology", topologyDefinition.getName()); + assertEquals(1, topologyDefinition.getConfig().size()); + assertEquals(1, topologyDefinition.getConfig().get("topology.workers")); + + BoltDefinition bolt1 = topologyDefinition.getBolt(BOLT_1); + assertNotNull(bolt1); + assertEquals(2, bolt1.getParallelism()); + assertEquals("com.twitter.heron.sample.TestWordCounter", bolt1.getClassName()); + assertEquals(BOLT_1, bolt1.getId()); + + + BoltDefinition bolt2 = topologyDefinition.getBolt(BOLT_2); + assertEquals(1, bolt2.getParallelism()); + assertEquals("com.twitter.heron.sample.LogInfoBolt", bolt2.getClassName()); + assertEquals(BOLT_2, bolt2.getId()); + + List streamDefinitions = topologyDefinition.getStreams(); + StreamDefinition streamDefinitionOne = streamDefinitions.get(0); + GroupingDefinition groupingDefinitionOne = streamDefinitionOne.getGrouping(); + StreamDefinition streamDefinitionTwo = streamDefinitions.get(1); + GroupingDefinition groupingDefinitionTwo = streamDefinitionTwo.getGrouping(); + + assertEquals(2, streamDefinitions.size()); + + assertEquals(BOLT_1, streamDefinitionOne.getTo()); + assertEquals("spout-1", streamDefinitionOne.getFrom()); + assertEquals(GroupingDefinition.Type.FIELDS, groupingDefinitionOne.getType()); + assertEquals(1, groupingDefinitionOne.getArgs().size()); + assertEquals("word", groupingDefinitionOne.getArgs().get(0)); + assertEquals("connection-1", streamDefinitionOne.getId()); + + assertEquals(BOLT_2, streamDefinitionTwo.getTo()); + assertEquals("bolt-1", streamDefinitionTwo.getFrom()); + assertEquals(GroupingDefinition.Type.SHUFFLE, groupingDefinitionTwo.getType()); + assertEquals("connection-2", streamDefinitionTwo.getId()); + assertNull(groupingDefinitionTwo.getArgs()); + + } + + @Test + public void testPartFromInputStream_NoConfigSpecified_ConfigMapIsEmpty() throws Exception { + InputStream inputStream = new ByteArrayInputStream(YAML_NO_CONFIG_STR.getBytes()); + + EcoTopologyDefinition topologyDefinition = subject.parseFromInputStream(inputStream); + + assertNotNull(topologyDefinition.getConfig()); + assertEquals(0, topologyDefinition.getConfig().size()); + } + + @Test(expected = Exception.class) + public void testParseFromInputStream_StreamIsNull_ExceptionThrown() throws Exception { + InputStream inputStream = null; + EcoTopologyDefinition ecoTopologyDefinition = null; + + try { + ecoTopologyDefinition = subject.parseFromInputStream(inputStream); + } finally { + assertNull(ecoTopologyDefinition); + } + } +} diff --git a/heron/eco/tests/java/com/twitter/heron/eco/submit/EcoSubmitterTest.java b/heron/eco/tests/java/com/twitter/heron/eco/submit/EcoSubmitterTest.java new file mode 100644 index 00000000000..3291d4981de --- /dev/null +++ b/heron/eco/tests/java/com/twitter/heron/eco/submit/EcoSubmitterTest.java @@ -0,0 +1,55 @@ +// Copyright 2018 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.eco.submit; + +import org.apache.storm.Config; +import org.apache.storm.StormSubmitter; +import org.apache.storm.generated.StormTopology; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.mockito.PowerMockito; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.internal.verification.VerificationModeFactory.times; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(StormSubmitter.class) +public class EcoSubmitterTest { + + private EcoSubmitter subject; + + @Before + public void setUpForEachTestCase() { + subject = new EcoSubmitter(); + } + + @Test + public void submitTopology_AllGood_BehavesAsExpected() + throws Exception { + Config config = new Config(); + StormTopology topology = new StormTopology(); + PowerMockito.spy(StormSubmitter.class); + PowerMockito.doNothing().when(StormSubmitter.class, "submitTopology", + any(String.class), any(Config.class), any(StormTopology.class)); + + subject.submitTopology("name", config, topology); + PowerMockito.verifyStatic(times(1)); + StormSubmitter.submitTopology(anyString(), any(Config.class), any(StormTopology.class)); + + } +} diff --git a/heron/metricsmgr/tests/java/com/twitter/heron/metricsmgr/executor/SinkExecutorTest.java b/heron/metricsmgr/tests/java/com/twitter/heron/metricsmgr/executor/SinkExecutorTest.java index 4c54dc3ce81..f767a0e1e5a 100644 --- a/heron/metricsmgr/tests/java/com/twitter/heron/metricsmgr/executor/SinkExecutorTest.java +++ b/heron/metricsmgr/tests/java/com/twitter/heron/metricsmgr/executor/SinkExecutorTest.java @@ -55,7 +55,7 @@ public class SinkExecutorTest { private static final String EXCEPTION_FIRST_TIME = "firstTime"; private static final String EXCEPTION_LOGGING = "logging"; private static final String RECORD_SOURCE = "source"; - private static final String RECORD_CONTEXT = "context"; + private static final String RECORD_CONTEXT = "ecoExecutionContext"; private volatile int processRecordInvoked = 0; private volatile int flushInvoked = 0; diff --git a/heron/spi/tests/java/com/twitter/heron/spi/metricsmgr/metrics/MetricsRecordTest.java b/heron/spi/tests/java/com/twitter/heron/spi/metricsmgr/metrics/MetricsRecordTest.java index dc696d1525d..36ad671350c 100644 --- a/heron/spi/tests/java/com/twitter/heron/spi/metricsmgr/metrics/MetricsRecordTest.java +++ b/heron/spi/tests/java/com/twitter/heron/spi/metricsmgr/metrics/MetricsRecordTest.java @@ -25,7 +25,7 @@ public class MetricsRecordTest { private static final int N = 100; private static final String SOURCE = "source"; - private static final String CONTEXT = "context"; + private static final String CONTEXT = "ecoExecutionContext"; private final List records = new ArrayList(); @Before diff --git a/scripts/get_all_heron_paths.sh b/scripts/get_all_heron_paths.sh index 5ea0e68be5f..ed40c02048d 100755 --- a/scripts/get_all_heron_paths.sh +++ b/scripts/get_all_heron_paths.sh @@ -67,7 +67,7 @@ function get_package_of() { } function get_heron_java_paths() { - local java_paths=$(find {heron,heron/tools,tools,integration_test,contrib} -name "*.java" | sed "s|/src/java/.*$|/src/java|"| sed "s|/java/src/.*$|/java/src|" | sed "s|/tests/java/.*$|/tests/java|" | sort -u | fgrep -v "heron/scheduler/" | fgrep -v "heron/scheduler/" ) + local java_paths=$(find {heron,heron/tools,tools,integration_test,storm-compatibility,contrib} -name "*.java" | sed "s|/src/java/.*$|/src/java|"| sed "s|/java/src/.*$|/java/src|" | sed "s|/tests/java/.*$|/tests/java|" | sort -u | fgrep -v "heron/scheduler/" | fgrep -v "heron/scheduler/" ) if [ "$(uname -s | tr 'A-Z' 'a-z')" != "darwin" ]; then java_paths=$(echo "${java_paths}" | fgrep -v "/objc_tools/") fi