diff --git a/heron/api/src/java/com/twitter/heron/api/HeronSubmitter.java b/heron/api/src/java/com/twitter/heron/api/HeronSubmitter.java index 1048f9e7a87..719df40479c 100644 --- a/heron/api/src/java/com/twitter/heron/api/HeronSubmitter.java +++ b/heron/api/src/java/com/twitter/heron/api/HeronSubmitter.java @@ -52,10 +52,10 @@ * with the "heron jar" command from the command-line, and then use this class to * submit your topologies. */ -public final class HeronSubmitter { +public final class HeronSubmitter implements TopologySubmitter { private static final Logger LOG = Logger.getLogger(HeronSubmitter.class.getName()); - private HeronSubmitter() { + public HeronSubmitter() { } /** @@ -70,6 +70,11 @@ private HeronSubmitter() { */ public static void submitTopology(String name, Config heronConfig, HeronTopology topology) throws AlreadyAliveException, InvalidTopologyException { + new HeronSubmitter().submitTopologyInherited(name, heronConfig, topology); + } + + public void submitTopologyInherited(String name, Config heronConfig, HeronTopology topology) + throws AlreadyAliveException, InvalidTopologyException { Map heronCmdOptions = Utils.readCommandLineOpts(); // We would read the topology initial state from arguments from heron-cli diff --git a/heron/api/src/java/com/twitter/heron/api/TopologySubmitter.java b/heron/api/src/java/com/twitter/heron/api/TopologySubmitter.java new file mode 100644 index 00000000000..88952785b72 --- /dev/null +++ b/heron/api/src/java/com/twitter/heron/api/TopologySubmitter.java @@ -0,0 +1,22 @@ +// Copyright 2018 Twitter. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package com.twitter.heron.api; + +import com.twitter.heron.api.exception.AlreadyAliveException; +import com.twitter.heron.api.exception.InvalidTopologyException; + +public interface TopologySubmitter { + void submitTopologyInherited(String name, Config heronConfig, HeronTopology topology) + throws AlreadyAliveException, InvalidTopologyException; +} diff --git a/heron/api/src/java/com/twitter/heron/streamlet/Runner.java b/heron/api/src/java/com/twitter/heron/streamlet/Runner.java index 05c572dfbab..091c507872a 100644 --- a/heron/api/src/java/com/twitter/heron/streamlet/Runner.java +++ b/heron/api/src/java/com/twitter/heron/streamlet/Runner.java @@ -15,6 +15,7 @@ package com.twitter.heron.streamlet; import com.twitter.heron.api.HeronSubmitter; +import com.twitter.heron.api.TopologySubmitter; import com.twitter.heron.api.exception.AlreadyAliveException; import com.twitter.heron.api.exception.InvalidTopologyException; import com.twitter.heron.api.topology.TopologyBuilder; @@ -34,11 +35,18 @@ public Runner() { } * @param builder The builder used to keep track of the sources. */ public void run(String name, Config config, Builder builder) { + this.run(name, config, builder, new HeronSubmitter()); + } + + public void run(String name, Config config, Builder builder, TopologySubmitter submitter) { BuilderImpl bldr = (BuilderImpl) builder; TopologyBuilder topologyBuilder = bldr.build(); try { - HeronSubmitter.submitTopology(name, config.getHeronConfig(), - topologyBuilder.createTopology()); + submitter.submitTopologyInherited( + name, + config.getHeronConfig(), + topologyBuilder.createTopology() + ); } catch (AlreadyAliveException | InvalidTopologyException e) { e.printStackTrace(); } diff --git a/heron/simulator/src/java/com/twitter/heron/simulator/Simulator.java b/heron/simulator/src/java/com/twitter/heron/simulator/Simulator.java index 9053a21f03a..7dd43e54e2d 100644 --- a/heron/simulator/src/java/com/twitter/heron/simulator/Simulator.java +++ b/heron/simulator/src/java/com/twitter/heron/simulator/Simulator.java @@ -24,6 +24,7 @@ import com.twitter.heron.api.Config; import com.twitter.heron.api.HeronTopology; +import com.twitter.heron.api.TopologySubmitter; import com.twitter.heron.api.generated.TopologyAPI; import com.twitter.heron.api.utils.TopologyUtils; import com.twitter.heron.common.basics.ByteAmount; @@ -40,7 +41,7 @@ * One Simulator instance can only submit one topology. Please have multiple Simulator instances * for multiple topologies. */ -public class Simulator { +public class Simulator implements TopologySubmitter { private static final Logger LOG = Logger.getLogger(Simulator.class.getName()); private final List instanceExecutors = new LinkedList<>(); @@ -96,6 +97,12 @@ protected void registerSystemConfig(SystemConfig sysConfig) { SingletonRegistry.INSTANCE.registerSingleton(SystemConfig.HERON_SYSTEM_CONFIG, sysConfig); } + public void submitTopologyInherited(String name, + Config heronConfig, + HeronTopology heronTopology) { + this.submitTopology(name, heronConfig, heronTopology); + } + /** * Submit and run topology in simulator * @param name topology name